# Sprawdzenie dostępności bibliotek Delta Lake

Na początku wykonaj poniższe paragrafy tworzące tabelę Delta Lake, jeśli wszystko się powiedzie możesz przejść do sekcji ***Wprowadzenie*** pominąć następną sekcję poświęconą konfiguracji. 
W przeciwnym razie wykonaj polecenia z sekcji ***Konfiguracja***.

In [1]:
import pyspark
from delta import *
from pyspark.sql.functions import col, explode, array

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-24012c40-b6af-4536-ab15-b049d61f5884;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.3.0 in central
	found io.delta#delta-storage;2.3.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 332ms :: artifacts dl 11ms
	:: modules in use:
	io.delta#delta-core_2.12;2.3.0 from central in [default]
	io.delta#delta-storage;2.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||

In [2]:
data = (
    spark.range(0, 5)
    .selectExpr("id as x")
    .withColumn("y", explode(array(col("x"))))
    .select("x", "y")
)

# Zapis danych do formatu Delta
data.write.format("delta").mode("overwrite").save("/tmp/delta-table2")

24/12/26 12:37:37 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

# Wprowadzenie

## Skonfigurowanie danych źródłowych 

Zanim zaczniemy korzystać i poznawać funkcjonalności biblioteki Delta Lake skonfigurujmy tabelę z danymi źródłowymi. 
A następnie uruchom go tworząc tymczasową perspektywę na przygotowanych uprzednio danych źródłowych. 

In [3]:
source_df = (
    spark.read
    .option("header", True)
    .option("quote", "\"")
    .csv("/tmp/DeltaLakeSourceData")
)

# Wyświetlenie schematu ramki danych
source_df.printSchema()

# Utworzenie tymczasowej tabeli
source_df.createOrReplaceTempView("source_data")

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- effectiveDate: string (nullable = true)



 
Nasze dane źródłowe zawierają informacje na temat klientów z kolejnych okresów czasu.

Uruchom poniższe zapytanie wydobywające wersje danych, które były aktualne na dzień `2021-01-01`.

In [4]:
df = spark.sql("""
select id, name, address, zipcode, city, country, effectiveDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate, 
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") < to_date("2021-01-01","yyyy-MM-dd")
    ) tab
where version = 1""")

In [5]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,10,Jin Terry,467-8297 Enim,35633573,Balıkesir,Nigeria,06-11-2020
1,100,Harriet Rojas,Ap #810-8710 Enim. St.,84541,Lipetsk,Canada,21-10-2020
2,11,Isabelle Stevenson,131-4245 Eleifend. Street,16142,Hà Giang,Russian Federation,25-11-2020
3,14,Leo Mcleod,467-8297 Enim,39153,Borås,Germany,16-10-2020
4,16,Kaitlin Landry,623-5682 Augue St.,351225,Libramont-Chevigny,Indonesia,29-10-2020
5,19,Alden Harper,Ap #579-2185 Sed Street,94671-72608,Châtellerault,Nigeria,06-12-2020
6,2,Brandon Christian,476-5064 Suspendisse Rd.,93-765,Broxburn,Russian Federation,28-11-2020
7,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Peru,16-11-2020
8,26,Ulysses Dillard,1318 Tempor Rd.,S5J 6Z2,Tuscaloosa,United States,11-10-2020
9,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020


W poniższych zadaniach możesz skorzystać zarówno z interfejsu w Scali jak i SQL. 

Wszystko zależy od Twoich preferencji.

Uwaga! W przypadku korzystania z SQL i wskazywania ścieżek, konieczne jest wykorzystywanie schematu `hdfs`.

Przykładowo `hdfs:/tmp/delta-customers`

## DDL

### Zadanie 1

Utwórz pustą tabelę *Delta Lake* o nazwie `customers`, której lokalizacją będzie `/tmp/delta-customers` (lub `hdfs:/tmp/delta-customers`). 

Kolumny tabeli muszą odpowiadać kolumnom danych źródłowych. 
Wszystkie kolumny powinny być ciągami znaków o długości do 200 znaków

Jeśli uruchamiasz ten notatnik po raz kolejny. Usuń zawartość katalogu z danymi tabeli Delta Lake wywołując poniższe polecenie

In [6]:
%%sh
if hadoop fs -test -e /tmp/delta-customers; then
  hadoop fs -rm -r /tmp/delta-customers
else
  echo "Directory already removed"
fi

Deleted /tmp/delta-customers


### Rozwiązanie zadania 1

In [7]:
spark.sql("DROP TABLE IF EXISTS customers")

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used


DataFrame[]

In [8]:
spark.sql("""
CREATE TABLE customers
(
  id   VARCHAR(200),
  name VARCHAR(200),
  address VARCHAR(200),
  zipcode VARCHAR(200),
  city VARCHAR(200),
  country VARCHAR(200),
  effectiveDate VARCHAR(200)
)
USING DELTA
LOCATION 'hdfs:/tmp/delta-customers';
""")


24/12/26 12:38:43 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `default`.`customers` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
24/12/26 12:38:43 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


DataFrame[]

## DML

### Zadanie 2

Wprowadź do utworzonej przez Ciebie tabeli dane o naszych klientach obowiązujące na dzień `2021-01-01`.

### Rozwiązanie zadania 2

In [9]:
spark.sql("""
INSERT INTO customers
(id, name, address, zipcode, city, country, effectiveDate)
SELECT id, name, address, zipcode, city, country, effectiveDate
FROM (
    SELECT id, name, address, zipcode, city, country, effectiveDate, 
           rank() OVER (PARTITION BY id ORDER BY to_date(effectiveDate, "dd-MM-yyyy") DESC) AS version
    FROM source_data
    WHERE to_date(effectiveDate, "dd-MM-yyyy") < DATE '2021-01-01'
)
WHERE version = 1;
""")


                                                                                

DataFrame[]

### Zadanie 3

Zmień kraj zamieszkania na wartość `Poland` u wszystkich klientów posiadających wartość id mniejszą niż 50. 

Zwróć uwagę, że obecnie id jest ciągiem znaków. Użyj wyrażenia `cast(id as int)` aby wykonać zadanie prawidłowo.

### Rozwiązanie zadania 3

In [10]:
spark.sql("""
    UPDATE customers
    SET country = 'Poland'
    WHERE CAST(id AS INT) < 50
""")


                                                                                

DataFrame[num_affected_rows: bigint]

Na chwilę się zatrzymajmy. Utworzenie tabeli to wpis w metadanych. Wprowadzenie nowych danych to utworzenie nowych plików w katalogu tabeli. Czym było zmodyfikowanie tych danych? 

Wykonaj poniższe polecenie, aby przyglądnąć się zawartości katalogu należącego do tabeli `customers`

In [11]:
%%sh
hadoop fs -ls /tmp/delta-customers

Found 3 items
drwxr-xr-x   - root hadoop          0 2024-12-26 12:39 /tmp/delta-customers/_delta_log
-rw-r--r--   2 root hadoop       4424 2024-12-26 12:39 /tmp/delta-customers/part-00000-39f5e085-51bb-4810-a751-12da3f5987ce-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4473 2024-12-26 12:38 /tmp/delta-customers/part-00000-8dfc8f44-04d4-4626-9e62-eee53aaa0b43-c000.snappy.parquet


Mamy katalog z logiem transakcyjnym i dwa pliki, które prawie nie różnią się wielkością.

Spróbujmy zrozumieć znaczenie tych plików zaglądając do historii zmian w naszej tabeli. 
Koniecznie przestudiuj kolumnę `operationMetrics`

In [12]:
df = spark.sql("describe history customers")

                                                                                

In [13]:
df.toPandas()

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2,2024-12-26 12:39:01.160,,,UPDATE,{'predicate': '(cast(id#1555 as int) < 50)'},,,,1.0,Serializable,False,"{'numRemovedBytes': '4473', 'numAddedFiles': '...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
1,1,2024-12-26 12:38:46.954,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,0.0,Serializable,True,"{'numOutputRows': '32', 'numOutputBytes': '447...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
2,0,2024-12-26 12:38:35.984,,,CREATE TABLE,"{'description': None, 'partitionBy': '[]', 'pr...",,,,,Serializable,True,{},,Apache-Spark/3.3.2 Delta-Lake/2.3.0


Poniższe zapytanie dostarcza danych o klientach, 
którzy pojawili się jako nowi, lub zmienili swoje dane w styczniu 2021.

In [14]:
df = spark.sql("""
select id, name, address, zipcode, city, country, effectiveDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate, 
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-01-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-02-01"
    )
where version = 1""")

In [15]:
df.toPandas()

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,100,Vance Palmer,"P.O. Box 221, 1718 Sociis Rd.",525734,Camarones,Poland,23-01-2021
1,14,Pearl Ward,401-3122 Aliquam Av.,4449,Elbistan,Spain,02-01-2021
2,21,Desirae Morin,Ap #675-9646 Ridiculus Avenue,22382,Bergen op Zoom,New Zealand,30-01-2021
3,27,Jerome Hines,598-974 Convallis Av.,10783,Tharparkar,Colombia,15-01-2021
4,29,Darius Cole,Ap #412-3424 Eu St.,12178,Birecik,New Zealand,30-01-2021
5,4,Robin Hartman,960-7120 Lectus Rd.,833082,Fundación,Peru,26-01-2021
6,44,Kasimir Irwin,156-1322 Nulla. Road,50218,Hồ Chí Minh City,Canada,30-01-2021
7,49,Justin Burch,806-9586 Quis Rd.,83477-576,Sokoto,Sweden,06-01-2021
8,59,Coby Blackwell,311-203 Ipsum St.,249414,Belfast,Mexico,13-01-2021
9,6,Jaime Dillon,8224 Amet Road,53604,Korneuburg,Peru,17-01-2021


### Zadanie 4

Chcemy zaktualizować dane naszych klientów w oparciu o ich styczniowe wersje. Zrobimy to w dwóch krokach

1. Usuniemy z tabeli `customers` dane już nieaktualne, a następnie 
2. Wstawimy do niej dane zgodne ze styczniowymi zmianami

Usuń z tabeli `customers` tych klientów, którzy zmienili swoje dane w styczniu 2021. Identyfikacja klientów odbywać się powinna za każdym razem w oparciu o atrybut `id`.

Jeśli okaże się, że polecenie `DELETE` nie wspiera podzapytań, skorzystaj z interfejsu w Scali,
lub za pomocą poniższego kodu, w dodatkowym paragrafie uzyskaj identyfikatory klientów do usunięcia, a następnie wkomponuj uzyskaną wartość w polecenie usuwające klientów.

```python
usun_ids_df = spark.sql("""
    SELECT id AS usun
    FROM (
        SELECT
            id, name, address, zipcode, city, country, effectiveDate,
            RANK() OVER (PARTITION BY id ORDER BY to_date(effectiveDate, 'dd-MM-yyyy') DESC) AS version
        FROM source_data
        WHERE to_date(effectiveDate, 'dd-MM-yyyy') >= '2021-01-01'
            AND to_date(effectiveDate, 'dd-MM-yyyy') < '2021-02-01'
    ) tab
    WHERE version = 1
""")

# Pobranie wyników jako listy
usun_ids_list = usun_ids_df.select("usun").rdd.flatMap(lambda x: x).collect()

# Konwersja do ciągu znaków
usun_ids_str = ",".join(map(str, usun_ids_list))
```

### Rozwiązanie zadania 4 - dodatkowy paragraf

In [16]:
usun_ids_df = spark.sql("""
    SELECT id AS usun
    FROM (
        SELECT
            id, name, address, zipcode, city, country, effectiveDate,
            RANK() OVER (PARTITION BY id ORDER BY to_date(effectiveDate, 'dd-MM-yyyy') DESC) AS version
        FROM source_data
        WHERE to_date(effectiveDate, 'dd-MM-yyyy') >= '2021-01-01'
            AND to_date(effectiveDate, 'dd-MM-yyyy') < '2021-02-01'
    ) tab
    WHERE version = 1
""")

# Pobranie wyników jako listy
usun_ids_list = usun_ids_df.select("usun").rdd.flatMap(lambda x: x).collect()

# Konwersja do ciągu znaków
usun_ids_str = ",".join(map(str, usun_ids_list))

                                                                                

### Rozwiązanie zadania 4

In [17]:
# Wykonanie zapytania SQL do usunięcia rekordów
spark.sql(f"""
    DELETE FROM customers
    WHERE id IN ({usun_ids_str})
""")

                                                                                

DataFrame[num_affected_rows: bigint]

Za pomocą poniższego polecenia wstaw do tabeli `customers` dane klientów, 
którzy zmienili swoje dane w styczniu 2021.

In [18]:
spark.sql("""
INSERT INTO customers 
select id, name, address, zipcode, city, country, effectiveDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate, 
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-01-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-02-01"
    )
where version = 1""")

                                                                                

DataFrame[]

Zanim przejdziemy dalej, ponownie zastanówmy się nad tym co działo się pod spodem. 

Spróbuj odpowiedzieć na dwa pytania:

1. Ile nowych plików przybyło po naszych dwóch operacjach `delete` i `insert`.
2. Ile plików będzie aktywnych - wykorzystywanych przy zapytaniach?

Znasz odpowiedzi? Sprawdźmy czy są one prawidłowe.

Wykonaj poniższe polecenia

In [19]:
%%sh
hadoop fs -ls /tmp/delta-customers

Found 5 items
drwxr-xr-x   - root hadoop          0 2024-12-26 12:39 /tmp/delta-customers/_delta_log
-rw-r--r--   2 root hadoop       4433 2024-12-26 12:39 /tmp/delta-customers/part-00000-05b50041-aa6e-467e-bdf5-0cab105d90b3-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4424 2024-12-26 12:39 /tmp/delta-customers/part-00000-39f5e085-51bb-4810-a751-12da3f5987ce-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4473 2024-12-26 12:38 /tmp/delta-customers/part-00000-8dfc8f44-04d4-4626-9e62-eee53aaa0b43-c000.snappy.parquet
-rw-r--r--   2 root hadoop       3218 2024-12-26 12:39 /tmp/delta-customers/part-00000-b4ac768a-b217-4b89-a04d-3b12f37aaa33-c000.snappy.parquet


In [20]:
df = spark.sql("describe history customers")

                                                                                

In [21]:
df.toPandas()

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,4,2024-12-26 12:39:26.863,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,3.0,Serializable,True,"{'numOutputRows': '15', 'numOutputBytes': '321...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
1,3,2024-12-26 12:39:20.851,,,DELETE,"{'predicate': '[""(spark_catalog.default.custom...",,,,2.0,Serializable,False,"{'numRemovedBytes': '4424', 'numAddedFiles': '...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
2,2,2024-12-26 12:39:01.160,,,UPDATE,{'predicate': '(cast(id#1555 as int) < 50)'},,,,1.0,Serializable,False,"{'numRemovedBytes': '4473', 'numAddedFiles': '...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
3,1,2024-12-26 12:38:46.954,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,0.0,Serializable,True,"{'numOutputRows': '32', 'numOutputBytes': '447...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
4,0,2024-12-26 12:38:35.984,,,CREATE TABLE,"{'description': None, 'partitionBy': '[]', 'pr...",,,,,Serializable,True,{},,Apache-Spark/3.3.2 Delta-Lake/2.3.0


Powyższa kombinacja poleceń `delete` oraz `insert` w prosty sposób 
może zostać zastąpiona poleceniem `merge`.

### Zadanie 5

Korzystając z jednego polecenia `merge` jednocześnie
*   wstaw nowych klientów, którzy pojawili się po raz pierwszy w lutym 2021
*   zaktualizuj dane adresowe oraz `effectiveDate` tych klientów, którzy w tym samym czasie te dane zmienili. 

Poniższe polecenie uzyskuje dane z obu grup klientów:

```python
select id, name, address, zipcode, city, country, effectiveDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate, 
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-02-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-03-01"
    )
where version = 1
```

### Rozwiązanie zadania 5

In [22]:
spark.sql("""
MERGE INTO customers
USING (
    SELECT id, name, address, zipcode, city, country, effectiveDate
    FROM (
        SELECT id, name, address, zipcode, city, country, effectiveDate, 
               RANK() OVER (PARTITION BY id ORDER BY to_date(effectiveDate, 'dd-MM-yyyy') DESC) AS version
        FROM source_data
        WHERE to_date(effectiveDate, 'dd-MM-yyyy') >= DATE '2021-02-01'
          AND to_date(effectiveDate, 'dd-MM-yyyy') < DATE '2021-03-01'
    ) AS subquery
    WHERE version = 1
) new_customers
ON customers.id = new_customers.id
WHEN MATCHED THEN
    UPDATE SET 
        customers.address = new_customers.address,
        customers.zipcode = new_customers.zipcode, 
        customers.city = new_customers.city,
        customers.country = new_customers.country,
        customers.effectiveDate = new_customers.effectiveDate
WHEN NOT MATCHED THEN
    INSERT (id, name, address, zipcode, city, country, effectiveDate)
    VALUES (new_customers.id, new_customers.name, new_customers.address, 
            new_customers.zipcode, new_customers.city, new_customers.country, new_customers.effectiveDate)
""")


24/12/26 12:39:41 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
24/12/26 12:39:41 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
24/12/26 12:39:41 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
                                                                                

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

Za pomocą poniższego polecenia sprawdź, z których miesięcy pochodzą dane naszych klientów.

In [23]:
df = spark.sql("""
select substr(effectiveDate,4,10) as month,
       count(*) as how_many
from customers
group by substr(effectiveDate,4,10)
order by to_date(substr(effectiveDate,4,10),"MM-yyyy")""")

In [24]:
df.toPandas()

Unnamed: 0,month,how_many
0,10-2020,6
1,11-2020,12
2,12-2020,7
3,01-2021,15
4,02-2021,8


## Struktury plików, transakcje

Przed chwilą wykonaliśmy szereg operacji DML na naszej tabeli *customers*. 
Każda z nich, z jednej strony była oddzielną transakcją zapisaną w logach Delta Lake, z drugiej strony każda z nich dokonała pewnych zmian w plikach naszej tabeli.  

Za pomocą kilku kolejnych paragrafów rozglądniemy się na początku po strukturze plików, potem postaramy się wydobyć informacje na temat naszych transakcji.

In [25]:
# w kolumnie location znajdziesz katalog, w którym nasza tabela jest przechowywana
df = spark.sql("describe detail customers")

In [26]:
df.toPandas()

Unnamed: 0,format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures
0,delta,a85775b1-c55e-40e4-8859-293d3394fda4,default.customers,,hdfs://pbd-cluster-m/tmp/delta-customers,2024-12-26 12:38:35.873,2024-12-26 12:39:42.930,[],2,7749,{},1,2,"[appendOnly, invariants]"


Sprawdź ile plików znajduje się w tym katalogu

Sprawdź ile plików ma najstarsze daty - powstało gdy po raz pierwszy załadowaliśmy do tabeli dane

W tym celu wykonaj poniższe polecenia:

In [27]:
%%sh 
hadoop fs -ls -t /tmp/delta-customers

Found 6 items
drwxr-xr-x   - root hadoop          0 2024-12-26 12:39 /tmp/delta-customers/_delta_log
-rw-r--r--   2 root hadoop       4531 2024-12-26 12:39 /tmp/delta-customers/part-00000-26ab0244-e89a-4d09-b659-bcea157546f9-c000.snappy.parquet
-rw-r--r--   2 root hadoop       3218 2024-12-26 12:39 /tmp/delta-customers/part-00000-b4ac768a-b217-4b89-a04d-3b12f37aaa33-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4433 2024-12-26 12:39 /tmp/delta-customers/part-00000-05b50041-aa6e-467e-bdf5-0cab105d90b3-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4424 2024-12-26 12:39 /tmp/delta-customers/part-00000-39f5e085-51bb-4810-a751-12da3f5987ce-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4473 2024-12-26 12:38 /tmp/delta-customers/part-00000-8dfc8f44-04d4-4626-9e62-eee53aaa0b43-c000.snappy.parquet


In [28]:
df = spark.sql("describe history customers")

In [29]:
df.toPandas()

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,5,2024-12-26 12:39:42.930,,,MERGE,"{'matchedPredicates': '[{""actionType"":""update""...",,,,4.0,Serializable,False,"{'numOutputRows': '33', 'numTargetBytesAdded':...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
1,4,2024-12-26 12:39:26.863,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,3.0,Serializable,True,"{'numOutputRows': '15', 'numOutputBytes': '321...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
2,3,2024-12-26 12:39:20.851,,,DELETE,"{'predicate': '[""(spark_catalog.default.custom...",,,,2.0,Serializable,False,"{'numRemovedBytes': '4424', 'numAddedFiles': '...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
3,2,2024-12-26 12:39:01.160,,,UPDATE,{'predicate': '(cast(id#1555 as int) < 50)'},,,,1.0,Serializable,False,"{'numRemovedBytes': '4473', 'numAddedFiles': '...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
4,1,2024-12-26 12:38:46.954,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,0.0,Serializable,True,"{'numOutputRows': '32', 'numOutputBytes': '447...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
5,0,2024-12-26 12:38:35.984,,,CREATE TABLE,"{'description': None, 'partitionBy': '[]', 'pr...",,,,,Serializable,True,{},,Apache-Spark/3.3.2 Delta-Lake/2.3.0


In [30]:
from delta.tables import DeltaTable

# Inicjalizacja ścieżki do tabeli Delta
delta_path = "/tmp/delta-customers"
cust_delta_table = DeltaTable.forPath(spark, delta_path)

# Pobranie pełnej historii tabeli Delta
full_hist_df = cust_delta_table.history().select(
    "version", "timestamp", "operation", "isBlindAppend", "operationMetrics"
)

In [31]:
full_hist_df.toPandas()

Unnamed: 0,version,timestamp,operation,isBlindAppend,operationMetrics
0,5,2024-12-26 12:39:42.930,MERGE,False,"{'numOutputRows': '33', 'numTargetBytesAdded':..."
1,4,2024-12-26 12:39:26.863,WRITE,True,"{'numOutputRows': '15', 'numOutputBytes': '321..."
2,3,2024-12-26 12:39:20.851,DELETE,False,"{'numRemovedBytes': '4424', 'numAddedFiles': '..."
3,2,2024-12-26 12:39:01.160,UPDATE,False,"{'numRemovedBytes': '4473', 'numAddedFiles': '..."
4,1,2024-12-26 12:38:46.954,WRITE,True,"{'numOutputRows': '32', 'numOutputBytes': '447..."
5,0,2024-12-26 12:38:35.984,CREATE TABLE,True,{}


**Kilka pytań**

Zwróć uwagę, że niektóre z tych operacji mają flagę `isBlindAppend` zapaloną. **Które to były operacje?**

Każda zmiana - polecenia: `update`, `delete` czy `merge` "usuwała" dużą część (być może nawet wszystkie) aktywnych plików zastępując je nowymi. 

Co mogłoby spowodować, że liczba zastępowanych plików nie obejmowałaby wszystkich aktywnych plików?

# Podróż w czasie

Delta Lake to nie tylko wydajna obsługa (i możliwość wykonania) operacji DML, 
to także łatwość w odzyskiwaniu zniszczonych danych.

## Podstawy - czas i numery wersji

Na początku trochę podstaw. 

Sprawdź czas wykonania Twojej operacji `update`, która zmieniła kraj niektórym klientom na `Poland`.

In [32]:
from delta.tables import DeltaTable
from pyspark.sql import functions as F

# Inicjalizacja ścieżki do tabeli Delta
delta_path = "/tmp/delta-customers"
delta_table = DeltaTable.forPath(spark, delta_path)

# Pobranie historii operacji UPDATE
history = delta_table.history().where(F.col("operation") == "UPDATE").select("timestamp", "version")

In [33]:
history.toPandas()

Unnamed: 0,timestamp,version
0,2024-12-26 12:39:01.160,2


Skorzystaj teraz z tej daty, aby w poniższym paragrafie uzyskać dane jakie obowiązywały przed tą datą. 

In [34]:
timestamp_value = history.orderBy(F.col("timestamp").desc()).first()["timestamp"]

df = (
    spark.read
    .format("delta")
    .option("timestampAsOf", timestamp_value)
    .load("/tmp/delta-customers")
    .where(col("country").like("P%"))
)

In [35]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,10,Jin Terry,467-8297 Enim,35633573,Balıkesir,Poland,06-11-2020
1,11,Isabelle Stevenson,131-4245 Eleifend. Street,16142,Hà Giang,Poland,25-11-2020
2,14,Leo Mcleod,467-8297 Enim,39153,Borås,Poland,16-10-2020
3,16,Kaitlin Landry,623-5682 Augue St.,351225,Libramont-Chevigny,Poland,29-10-2020
4,19,Alden Harper,Ap #579-2185 Sed Street,94671-72608,Châtellerault,Poland,06-12-2020
5,2,Brandon Christian,476-5064 Suspendisse Rd.,93-765,Broxburn,Poland,28-11-2020
6,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Poland,16-11-2020
7,26,Ulysses Dillard,1318 Tempor Rd.,S5J 6Z2,Tuscaloosa,Poland,11-10-2020
8,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020
9,33,Alexander Becker,Ap #631-7469 Curae St.,29941,Anseong,Poland,04-11-2020


In [36]:
version_value = history.orderBy(F.col("version").desc()).first()["version"] - 1

df = (
    spark.read
    .format("delta")
    .option("versionAsOf", version_value)
    .load("/tmp/delta-customers")
    .where(col("country").like("P%"))
)

In [37]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Peru,16-11-2020
1,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020
2,68,Clarke Carlson,2296 Vestibulum St.,163826,Logan City,Poland,28-10-2020
3,74,Vernon Casey,650-5308 Felis Rd.,42987,Mirpur,Pakistan,14-11-2020


Skorzystaj z tej daty ponownie w poniższym paragrafie, 
aby tym razem uzyskać dane jakie obowiązywały po modyfikacji.

In [38]:
df = (
    spark.read
    .format("delta")
    .option("timestampAsOf", timestamp_value)
    .load("/tmp/delta-customers")
    .where(col("country").like("P%"))
)

In [39]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,10,Jin Terry,467-8297 Enim,35633573,Balıkesir,Poland,06-11-2020
1,11,Isabelle Stevenson,131-4245 Eleifend. Street,16142,Hà Giang,Poland,25-11-2020
2,14,Leo Mcleod,467-8297 Enim,39153,Borås,Poland,16-10-2020
3,16,Kaitlin Landry,623-5682 Augue St.,351225,Libramont-Chevigny,Poland,29-10-2020
4,19,Alden Harper,Ap #579-2185 Sed Street,94671-72608,Châtellerault,Poland,06-12-2020
5,2,Brandon Christian,476-5064 Suspendisse Rd.,93-765,Broxburn,Poland,28-11-2020
6,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Poland,16-11-2020
7,26,Ulysses Dillard,1318 Tempor Rd.,S5J 6Z2,Tuscaloosa,Poland,11-10-2020
8,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020
9,33,Alexander Becker,Ap #631-7469 Curae St.,29941,Anseong,Poland,04-11-2020


In [40]:
df = (
    spark.read
    .format("delta")
    .option("versionAsOf", version_value)
    .load("/tmp/delta-customers")
    .where(col("country").like("P%"))
)

In [41]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Peru,16-11-2020
1,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020
2,68,Clarke Carlson,2296 Vestibulum St.,163826,Logan City,Poland,28-10-2020
3,74,Vernon Casey,650-5308 Felis Rd.,42987,Mirpur,Pakistan,14-11-2020


## Rollback

### Zadanie 6

Wyobraź sobie, że ta operacja zmiany kraju na wartość `Poland` okazała się być błędna. 
Twoim zadaniem jest przywrócić wartości, które zostały nadpisane przez tą modifikację. 
Nie naprawiaj danych jeśli pojawiły się późniejsze (po Twoim poleceniu `update`) aktualizacje adresu (skorzystaj z `effectiveDate`).
Świetnie do takiej naprawy może się przydać operacja `merge`. Tym razem będzie ona miała tylko sekcję `WHEN MATCHED THEN`.

Oczywiście nie korzystaj z danych źródłowych - ich już nie ma. Jest tylko Twoja tabela Delta Lake. 

Skorzystaj z DataFrame API.

In [42]:
# błędne dane nie tylko są w historii, one są nadal w bieżącej postaci naszych danych
df = spark.sql("""
select * 
from customers 
where country like 'P%' 
  and to_date(effectiveDate,'dd-MM-yyyy') < date '2021-01-01'""")

In [43]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,10,Jin Terry,467-8297 Enim,35633573,Balıkesir,Poland,06-11-2020
1,11,Isabelle Stevenson,131-4245 Eleifend. Street,16142,Hà Giang,Poland,25-11-2020
2,16,Kaitlin Landry,623-5682 Augue St.,351225,Libramont-Chevigny,Poland,29-10-2020
3,19,Alden Harper,Ap #579-2185 Sed Street,94671-72608,Châtellerault,Poland,06-12-2020
4,2,Brandon Christian,476-5064 Suspendisse Rd.,93-765,Broxburn,Poland,28-11-2020
5,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Poland,16-11-2020
6,26,Ulysses Dillard,1318 Tempor Rd.,S5J 6Z2,Tuscaloosa,Poland,11-10-2020
7,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020
8,33,Alexander Becker,Ap #631-7469 Curae St.,29941,Anseong,Poland,04-11-2020
9,41,Plato Vaughan,Ap #502-453 Non Rd.,06255-18554,Zoerle-Parwijs,Poland,25-11-2020


### Rozwiązanie zadania 6

In [44]:
from delta.tables import DeltaTable

# Inicjalizacja ścieżki do tabeli Delta
delta_path = "/tmp/delta-customers"
delta_table = DeltaTable.forPath(spark, delta_path)

# Wczytanie poprawnych danych z tabeli Delta na podstawie numeru wersji
poprawne_df = (
    spark.read
    .format("delta")
    .option("versionAsOf", version_value)
    .load(delta_path)
)

# Wykonanie operacji MERGE na tabeli Delta
delta_table.alias("old") \
    .merge(
        poprawne_df.alias("new"), 
        "old.id = new.id and to_date(old.effectiveDate,'dd-MM-yyyy') < date '2021-01-01'"
    ) \
    .whenMatchedUpdate(
        set={"country": "new.country"}
    ) \
    .execute()

                                                                                

Sprawdź czy operacja przywracania poprzedniej wersji danych się powiodła. 

In [45]:
# błędne dane nie tylko są w historii, one są nadal w bieżącej postaci naszych danych
df = spark.sql("""
select * 
from customers 
where country like 'P%' 
  and to_date(effectiveDate,'dd-MM-yyyy') < date '2021-01-01'""")

In [46]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Peru,16-11-2020
1,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020
2,68,Clarke Carlson,2296 Vestibulum St.,163826,Logan City,Poland,28-10-2020
3,74,Vernon Casey,650-5308 Felis Rd.,42987,Mirpur,Pakistan,14-11-2020


Oczywiście należy pamiętać, że powyższa funkcjonalność to nie podróż w czasie w dowolnym zakresie. 

Nieaktualne pliki są sukcesywnie usuwane. 

Niemniej, prosta możliwość naprawy popełnionego właśnie przed chwilą błędu jest nieoceniona w świecie Big Data. To dlatego świat Big Data z zasady nie pozwala niczego modyfikować, a tworzy jedynie nowe na podstawie starego. W przypadku problemów stare, może zostać wykorzystane. 

Dokładnie to dzieje się pod maską w Delta Lake.

# Zmiany schematu

Zmiana zawartości danych to jedno. Ale świat zmienia się znacznie bardziej. Wymaga to często zmian definicji struktur naszych danych. 

Przykładowo, nasz atrybut `effectiveDate` jest ciągiem znaków, ale od dziś dobrze aby był datą. 
To samo dotyczy `id`. Od teraz wolimy, aby był liczbą. 

Tabele Delta Lake są przygotowane na takie modyfikacje. Wynika to poniekąd z formatu plików jaki jest wykorzystywany pod spodem. 

## Walidacja schematu 

Po pierwsze, Delta Lake dokonuje walidacji danych podczas wstawiania nowych danych. 

In [47]:
# przed zmianami
df = spark.sql("describe customers")

In [48]:
df.toPandas()

Unnamed: 0,col_name,data_type,comment
0,id,string,
1,name,string,
2,address,string,
3,zipcode,string,
4,city,string,
5,country,string,
6,effectiveDate,string,
7,,,
8,# Partitioning,,
9,Not partitioned,,


Spróbuj wstawić dane z marca 2021, uzupełnione o dodatkową kolumnę `endDate`.
```python
select id, 
       name, address, zipcode, city, country, 
       effectiveDate, 
       cast(null as date) as endDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate,
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-03-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-04-01"
    )
where version = 1
```

In [49]:
spark.sql("""
INSERT INTO customers
select id, 
       name, address, zipcode, city, country, 
       effectiveDate, 
       cast(null as date) as endDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate,
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-03-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-04-01"
    )
where version = 1""")

                                                                                

DataFrame[]

Dzięki parametrowi `spark.databricks.delta.schema.autoMerge.enabled=true` doszło do automatycznej integracji schematu docelowej tabeli z postacią danych, które były do niej wstawiane. 

Kolumna `endDate` jest nam potrzebna i będzie wykorzystywana w następnym zadaniu. 

W wersjach Delta Lake 2.2 i wcześniejszych należało wykorzystać poniższe rozwiązanie do uzyskania takiej integracji

```python
# Wybór danych z okresu od '2021-03-01' do '2021-04-01'
data_032021 = (
    source_data
    .filter((to_date("effectiveDate", "dd-MM-yyyy") >= '2021-03-01') & (to_date("effectiveDate", "dd-MM-yyyy") < '2021-04-01'))
    .withColumn("version", row_number().over(Window.partitionBy("id").orderBy(col("effectiveDate").desc())))
    .filter("version = 1")
    .withColumn("endDate", lit(None).cast("date"))
    .select("id", "name", "address", "zipcode", "city", "country", "effectiveDate", "endDate")
)

# Zapis danych do tabeli Delta
data_032021.write.option("mergeSchema", "true").format("delta").mode("append").saveAsTable("customers")
```


Sprawdź jak wygląda schemat Twojej tabeli po zmianach 

In [50]:
df = spark.sql("describe customers")

In [51]:
df.toPandas()

Unnamed: 0,col_name,data_type,comment
0,id,string,
1,name,string,
2,address,string,
3,zipcode,string,
4,city,string,
5,country,string,
6,effectiveDate,string,
7,endDate,date,
8,,,
9,# Partitioning,,


 
Niektóre zmiany schematu wymagają nadpisania całej zawartości tabeli 

`.mode("overwrite").option("overwriteSchema", "true")`. 

Źródłem danych może być oczywiście ta sama tabela, w tym również jej poprzednia wersja.

## Zadanie 7

Korzystając z tego mechanizmu zmień typy kolumn, o których wspominaliśmy.
Przy okazji wycofaj ostatnią aktualizację. Nie była przemyślana - dodaliśmy nowe dane (`append`) a powinniśmy uwzględnić fakt, że nie wszyscy klienci w nowym zestawie danych byli nowi, część z nich wymagała aktualizacji. W rezultacie pojawiły się duplikaty w naszych danych. 

Sprawdź, której wersji danych potrzebujesz.

In [52]:
from delta.tables import DeltaTable

# Inicjalizacja ścieżki do tabeli Delta
delta_path = "/tmp/delta-customers"
cust_delta_table = DeltaTable.forPath(spark, delta_path)

# Pobranie pełnej historii tabeli Delta
full_hist_df = cust_delta_table.history().select(
    "version", "timestamp", "operation", "isBlindAppend", "operationMetrics"
)

In [53]:
full_hist_df.toPandas()

Unnamed: 0,version,timestamp,operation,isBlindAppend,operationMetrics
0,7,2024-12-26 12:40:29.086,WRITE,True,"{'numOutputRows': '15', 'numOutputBytes': '343..."
1,6,2024-12-26 12:40:22.596,MERGE,False,"{'numOutputRows': '33', 'numTargetBytesAdded':..."
2,5,2024-12-26 12:39:42.930,MERGE,False,"{'numOutputRows': '33', 'numTargetBytesAdded':..."
3,4,2024-12-26 12:39:26.863,WRITE,True,"{'numOutputRows': '15', 'numOutputBytes': '321..."
4,3,2024-12-26 12:39:20.851,DELETE,False,"{'numRemovedBytes': '4424', 'numAddedFiles': '..."
5,2,2024-12-26 12:39:01.160,UPDATE,False,"{'numRemovedBytes': '4473', 'numAddedFiles': '..."
6,1,2024-12-26 12:38:46.954,WRITE,True,"{'numOutputRows': '32', 'numOutputBytes': '447..."
7,0,2024-12-26 12:38:35.984,CREATE TABLE,True,{}


### Rozwiązanie zadania 7

In [54]:
from pyspark.sql.functions import to_date, col, lit

version_value = full_hist_df.orderBy(col("version").desc()).first()["version"] - 1

# Załaduj dane z wersji przed ostatnią aktualizacją (na podstawie numeru wersji)
customers_df = (
  spark.read.format("delta")
  .option("versionAsOf", version_value)
  .table("customers")
  .withColumn("effectiveDate", to_date(col("effectiveDate"), "dd-MM-yyyy"))  # Konwersja kolumny effectiveDate na date
  .withColumn("zipcode", col("zipcode").cast("int"))  # Zmiana typu kolumny zipcode na int
  .withColumn("endDate", lit(None).cast("date"))  # Dodanie kolumny endDate z wartością null
)

# Nadpisanie tabeli Delta z nowymi danymi
(customers_df.write.format("delta")
  .mode("overwrite")  # Użycie trybu overwrite
  .option("overwriteSchema", "true")  # Nadpisanie schematu
  .saveAsTable("customers")
)


                                                                                

In [55]:
# odczekaj chwilę aby zmiany doszły do skutku
df = spark.sql("describe customers")

In [56]:
df.toPandas()

Unnamed: 0,col_name,data_type,comment
0,id,string,
1,name,string,
2,address,string,
3,zipcode,int,
4,city,string,
5,country,string,
6,effectiveDate,date,
7,endDate,date,
8,,,
9,# Partitioning,,


Zmiany można także wymusić ręcznie. 
Dodaj jeszcze jedną kolumnę za pomocą poniższego paragrafu.

In [57]:
spark.sql("""
ALTER TABLE customers ADD COLUMNS (current boolean COMMENT 'if current data' AFTER country)
""")

                                                                                

DataFrame[]

In [58]:
df = spark.sql("describe customers")

In [59]:
df.toPandas()

Unnamed: 0,col_name,data_type,comment
0,id,string,
1,name,string,
2,address,string,
3,zipcode,int,
4,city,string,
5,country,string,
6,current,boolean,if current data
7,effectiveDate,date,
8,endDate,date,
9,,,


# Finał 

Na zakończenie zaimplementujemy sposób utrzymywania tabel wymiarów jaki często wykorzystywany jest w hurtowniach danych.

## Slowly changing data (SCD) Type 2

Nasza tabela `customers` jest już na to gotowa. 
Kolumna `current` powinna być zapalona tylko dla najnowszych wersji danych. 
Kolumna `endDate` powinna mieć wartość zakończenia obowiązywania danej wersji danych jeśli pojawi się nowy wiersz z nową wersją danych. 



## Zadanie 8

**Zaimplementuj** funkcję `updateCustomers`, która na podstawie danych źródłowych z kolejnego *miesiąca* (parametr funkcji) będzie aktualizowała zawartość tabeli `customers` zgodnie a regułami ***SCD Type 2***.  

Po zakończonej implementacji **sprawdź jej działanie**. 

Jeśli uważasz, że obecne dane w tabeli customers powinny zostać poprawione, **dokonaj wcześniej stosownych korekt**. 

Jeśli chcesz skorzystaj ze strony:
https://docs.delta.io/latest/delta-update.html#-merge-in-scd-type-2



## Rozwiązanie zadania 8

In [60]:
def updateCustomers(new_data_df):
    delta_path = "/tmp/delta-customers"
    cust_delta_table = DeltaTable.forPath(spark, delta_path)
    
    # Sprawdzamy typ kolumny 'zipcode' w customers
    customer_df = spark.table("customers")
    zipcode_type = customer_df.schema["zipcode"].dataType
    
    # Ujednolicenie typu kolumny 'zipcode'
    if zipcode_type == "IntegerType":
        new_data_df = new_data_df.withColumn("zipcode", col("zipcode").cast("integer"))
    elif zipcode_type == "StringType":
        new_data_df = new_data_df.withColumn("zipcode", col("zipcode").cast("string"))

    return
    
    new_data_df = new_data_df.withColumn("current", lit(True)) \
        .withColumn("endDate", lit(None).cast("date"))
    
    # Merge z tabelą customers, aktualizacja danych
    cust_delta_table.alias("customers") \
        .merge(
            new_data_df.alias("new_data"),
            "customers.id = new_data.id AND customers.current = TRUE"
        ) \
        .whenMatchedUpdate(
            condition="customers.effectiveDate != new_data.effectiveDate",
            set={
                "customers.endDate": to_date(lit("current_date()")),
                "customers.current": lit(False)
            }
        ) \
        .execute()

    # Dopisanie nowych danych
    new_data_df.write.format("delta") \
        .mode("append") \
        .saveAsTable("customers")

Masz to? 

Jeśli tak, to przyjmij gratulacje. Sprawdźmy jak to działa dla danych z marca, które wycofaliśmy. 

Na początku zobaczmy jakie to będą dane.

In [61]:
data_032021 = spark.sql("""
select id, 
       name, address, zipcode, city, country, 
       effectiveDate, 
       cast(null as date) as endDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate,
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-03-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-04-01"
    )
where version = 1""")

In [62]:
data_032021.toPandas()

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate,endDate
0,12,Tucker Russo,Ap #579-2185 Sed Street,5488 CT,Mataró,United Kingdom,17-03-2021,
1,24,Florence Landry,Ap #457-3976 Turpis. St.,725475,Pohang,Australia,27-03-2021,
2,31,Olga Ramsey,467-8297 Enim,O1N 1T2,Bandar Lampung,United Kingdom,21-03-2021,
3,33,Cody Alvarado,503-3360 Mattis St.,66750,Campina Grande,Turkey,22-03-2021,
4,34,Rae Walter,656-9008 Felis. Avenue,52946,Poza Rica,India,06-03-2021,
5,53,Kieran Preston,"337-6887 Tincidunt, St.",40825,Townsville,Nigeria,02-03-2021,
6,55,Phoebe Craft,"337-6887 Tincidunt, St.",6338 CW,Rachecourt,Spain,20-03-2021,
7,60,Thane Mcfarland,2050 Augue. Avenue,538383,Dublin,Belgium,27-03-2021,
8,62,Arthur Castro,"337-6887 Tincidunt, St.",644923,Dublin,Germany,10-03-2021,
9,66,Violet Harding,"P.O. Box 221, 1718 Sociis Rd.",3353,Nashik,Brazil,03-03-2021,


Zobaczmy ile z tych nowych wersji klientów istnieje już w naszych danych

In [63]:
from pyspark.sql.functions import col

# Wybór identyfikatorów z ramki danych data_032021
new_ids = data_032021.select("id").collect()
new_ids_list = [str(row.id) for row in new_ids]
new_ids_str = ",".join(new_ids_list)

# Wybór danych z tabeli customers, gdzie id znajduje się w new_ids_str
df = spark.sql(f"""
    SELECT *
    FROM customers
    WHERE id IN ({new_ids_str})
""")

In [64]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,current,effectiveDate,endDate
0,33,Alexander Becker,Ap #631-7469 Curae St.,29941.0,Anseong,United States,,2020-11-04,
1,62,Griffin Mooney,417-2786 Bibendum Ave,,Ghizer,Costa Rica,,2020-10-16,
2,69,Chaney Ray,4442 Duis Avenue,64757.0,Stockholm,Nigeria,,2021-01-28,


Uruchom swoją funkcję

In [65]:
updateCustomers(data_032021)

Sprawdźmy jak wyglądają stara i nowa wersja jednego ze zaktualizowanych klientów.

In [66]:
df = spark.sql("""
select * from customers
where id = 33 """)

In [67]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,current,effectiveDate,endDate
0,33,Alexander Becker,Ap #631-7469 Curae St.,29941,Anseong,United States,,2020-11-04,


Jeśli poprzednia wersja została zamknięta z odpowiednią datą a nowa z tą samą datą utworzona... 

to osiągneliśmy to o co nam chodziło... Delta Lake.