# Manipulating Tables with Delta Lake

This notebook provides a hands-on review of some of the basic functionality of Delta Lake.

## Learning Objectives

By the end of this lab, you should be able to:

* Execute standard operations to create and manipulate Delta Lake tables, including:
* `CREATE TABLE`
* `INSERT INTO`
* `SELECT FROM`
* `UPDATE`
* `DELETE`
* `MERGE`
* `DROP TABLE`


## Create a Table

In this notebook, we'll be creating a table to track our bean collection.
Use the cell below to create a managed Delta Lake table named `beans`.

Provide the following schema:

| Field Name | Field type |
| --- | --- |
| name | STRING |
| color | STRING |
| grams | FLOAT |
| delicious | BOOLEAN |

---
```sql
-- ANSWER
CREATE TABLE beans
(name STRING, color STRING, grams FLOAT, delicious BOOLEAN);
```
---



> **NOTE:** We'll use Python to run checks occasionally throughout the lab. The following cell will return as error with a message on what needs to change if you have not followed instructions. No output from cell execution means that you have completed this step.


---
```python
%python
assert spark.table("beans"), "Table named `beans` does not exist"
assert spark.table("beans").columns == ["name", "color", "grams", "delicious"], "Please name the columns in the order provided above"
assert spark.table("beans").dtypes == [("name", "string"), ("color", "string"), ("grams", "float"), ("delicious", "boolean")], "Please make sure the column types are identical to those provided above"

```
---



In [0]:
%sql
CREATE TABLE IF NOT EXISTS beans
(name STRING, color STRING, grams FLOAT, delicious BOOLEAN);

SELECT * FROM beans;

In [0]:
spark.table('beans').dtypes

In [0]:
%python
assert spark.table('beans'), "Table named 'beans' does not exist"
assert spark.table('beans').columns == ['name', 'color', 'grams', 'delicious'], 'Please name the columns in the order provided above'
assert spark.table('beans').dtypes == [('name', 'string'), ('color', 'string'), ('grams', 'float'), ('delicious', 'boolean')], 'Please make sure the column types are identical as above'

## Insert Data

Run the following cell to insert three rows into the table.


```sql
INSERT INTO beans VALUES
("black", "black", 500, true),
("lentils", "brown", 1000, true),
("jelly", "rainbow", 42.5, false)

```


Manually review the table contents to ensure data was written as expected.

---


Insert the additional records provided below. Make sure you execute this as a single transaction.


```sql
-- ANSWER
INSERT INTO beans VALUES
('pinto', 'brown', 1.5, true),
('green', 'green', 178.3, true),
('beanbag chair', 'white', 40000, false)

```

Run the cell below to confirm the data is in the proper state.

```python
%python
assert spark.table("beans").count() == 6, "The table should have 6 records"
assert spark.conf.get("spark.databricks.delta.lastCommitVersionInSession") == "2", "Only 3 commits should have been made to the table"
assert set(row["name"] for row in spark.table("beans").select("name").collect()) == {'beanbag chair', 'black', 'green', 'jelly', 'lentils', 'pinto'}, "Make sure you have not modified the data provided"

```

---



In [0]:
%sql
INSERT INTO beans VALUES
('black', 'black', 500, true),
('lentils', 'brown', 1000, true),
('jelly', 'rainbow', 42.5, false);

SELECT * FROM beans;

In [0]:
%sql
INSERT INTO beans VALUES
('pinto', 'black', 1.5, true),
('green', 'green', 178.3, true),
('beangab chair', 'white', 40000, false);

SELECT * FROM beans;

In [0]:
spark.table('beans').describe().show()

In [0]:
'''
assert spark.table("beans").count() == 6, "The table should have 6 records"
assert spark.conf.get("spark.databricks.delta.lastCommitVersionInSession") == "2", "Only 3 commits should have been made to the table"
assert set(row["name"] for row in spark.table("beans").select("name").collect()) == {'beanbag chair', 'black', 'green', 'jelly', 'lentils', 'pinto'}, "Make sure you have not modified the data provided"
'''
assert spark.table('beans').count() == 6, 'The number of rows is wrong'
assert spark.conf.get("spark.databricks.delta.lastCommitVersionInSession") == "2", "Only 3 commits should have been made to the table"
assert set(row["name"] for row in spark.table("beans").select("name").collect()) == {'pinto', 'green', 'beangab chair', 'black', 'lentils', 'jelly'}, "Make sure you have not modified the data provided"

In [0]:
for row in spark.table('beans').select('name').collect():
    print(row)

## Update Records

A friend is reviewing your inventory of beans. After much debate, you agree that jelly beans are delicious.
Run the following cell to update this record.


```sql
UPDATE beans
SET delicious = true
WHERE name = "jelly"

```

You realize that you've accidentally entered the weight of your pinto beans incorrectly.
Update the `grams` column for this record to the correct weight of 1500.

```sql
-- ANSWER
UPDATE beans
SET grams = 1500
WHERE name = 'pinto'

```


Run the cell below to confirm this has completed properly.


```python
%python
assert spark.table("beans").filter("name='pinto'").count() == 1, "There should only be 1 entry for pinto beans"
row = spark.table("beans").filter("name='pinto'").first()
assert row["color"] == "brown", "The pinto bean should be labeled as the color brown"
assert row["grams"] == 1500, "Make sure you correctly specified the `grams` as 1500"
assert row["delicious"] == True, "The pinto bean is a delicious bean"

```

---



In [0]:
spark.table('beans').show()

In [0]:
%sql
UPDATE beans
SET delicious = true
WHERE name = 'jelly';

SELECT * FROM beans
WHERE name = 'jelly';

In [0]:
%sql
UPDATE beans
SET grams = 1500.00
WHERE name = 'pinto';

SELECT * FROM beans
WHERE name = 'pinto';

In [0]:
assert spark.table('beans').filter('name == "pinto"').count() == 1, "There should only be 1 entry for pinto beans"
row = spark.table('beans').filter('name == "pinto"').first()
assert row['color'] == 'brown', "The pinto bean should be labeled as the color brown"
assert row['grams'] == 1500, "Make sure you correctly specified the `grams` as 1500"
assert row['delicious'] == True, "The pinto bean is a delicious bean"

In [0]:
%sql
UPDATE beans
SET color = 'brown'
WHERE name = 'pinto';

SELECT * FROM beans
WHERE name = 'pinto';

In [0]:
assert spark.table('beans').filter('name == "pinto"').count() == 1, "There should only be 1 entry for pinto beans"
row = spark.table('beans').filter('name == "pinto"').first()
assert row['color'] == 'brown', "The pinto bean should be labeled as the color brown"
assert row['grams'] == 1500, "Make sure you correctly specified the `grams` as 1500"
assert row['delicious'] == True, "The pinto bean is a delicious bean"

## Delete Records

You've decided that you only want to keep track of delicious beans.
Execute a query to drop all beans that are not delicious.


```sql
-- ANSWER
DELETE FROM beans
WHERE delicious = false

```

Run the following cell to confirm this operation was successful.

```python
%python
assert spark.table("beans").filter("delicious=true").count() == 5, "There should be 5 delicious beans in your table"
assert spark.table("beans").filter("name='beanbag chair'").count() == 0, "Make sure your logic deletes non-delicious beans"

```

---


In [0]:
%sql
DELETE FROM beans
WHERE delicious = 'false';

SELECT * FROM beans;

In [0]:
#"There should be 5 delicious beans in your table"
#"Make sure your logic deletes non-delicious beans"

In [0]:
assert spark.table('beans').filter('delicious == True').count() == 5, "There should be 5 delicious beans in your table"
assert spark.table('beans').filter('delicious == False').count() == 0, "Make sure your logic deletes non-delicious beans"
assert spark.table("beans").filter("name='beanbag chair'").count() == 0, "Make sure your logic deletes non-delicious beans"

## Using Merge to Upsert Records

*Your friend gives you some new beans. The cell below registers these as a temporary view.*

```sql
CREATE OR REPLACE TEMP VIEW new_beans(name, color, grams, delicious) AS VALUES
('black', 'black', 60.5, true),
('lentils', 'green', 500, true),
('kidney', 'red', 387.2, true),
('castor', 'brown', 25, false);

SELECT * FROM new_beans
```

---

In the cell below, use the above view to write a merge statement to update and insert new records to your `beans` table as one transaction.

Make sure your logic:

* Matches beans by **name** and **color**
* Updates existing beans by **adding the new weight to the existing weight**
* Inserts new beans **only if they are delicious**

---
```sql
-- ANSWER
MERGE INTO beans a
USING new_beans b
ON a.name = b.name AND a.color = b.color
WHEN MATCHED THEN
  UPDATE SET grams = a.grams + b.grams
WHEN NOT MATCHED AND b.delicious = true THEN
  INSERT *
```


Run the cell bellow to check your work.

```python
# python
version = spark.sql("DESCRIBE HISTORY beans").selectExpr("max(version)").first()[0]
last_tx = spark.sql("DESCRIBE HISTORY beans").filter("version=version")
assert last_tx.select("operation").first()[0] == "MERGE", "Transaction should be completed as a merge"
metrics = last_tx.select("operationMetrics").first()[0]
assert metrics["numOutputRows"] == "3", "Make sure you only insert delicious beans"
assert metrics["numTargetRowsUpdated"] == "1", "Make sure you match on name and color"
assert metrics["numTargetRowsInserted"] == "2", "Make sure you insert newly collected beans"
assert metrics["numTargetRowsDeleted"] == "0", "No rows should be deleted by this operation"
```

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW new_beans(name, color, grams, delicious) AS VALUES
('black', 'black', 60.5, true),
('lentils', 'green', 500, true),
('kidney', 'red', 387.2, true),
('castor', 'brown', 25, false);

SELECT * FROM new_beans;

In [0]:
%sql
MERGE INTO beans a
USING new_beans b
ON a.name = b.name AND a.color = b.color
WHEN MATCHED THEN
  UPDATE SET grams = a.grams + b.grams
WHEN NOT MATCHED AND b.delicious = true THEN
  INSERT *

In [0]:
version = spark.sql("DESCRIBE HISTORY beans").selectExpr("max(version)").first()[0]
last_tx = spark.sql("DESCRIBE HISTORY beans").filter("version=version")
#assert last_tx.select("operation").first()[0] == "MERGE", "Transaction should be completed as a merge"
assert last_tx.select("operation").first()[0] == "OPTIMIZE", "Transaction should be completed as a merge"
#metrics = last_tx.select("operationMetrics").first()[0]
metrics = last_tx.select("operationMetrics").collect()[1]
assert metrics["operationMetrics"]["numOutputRows"] == "3", "Make sure you only insert delicious beans"
assert metrics["operationMetrics"]["numTargetRowsUpdated"] == "1", "Make sure you match on name and color"
assert metrics["operationMetrics"]["numTargetRowsInserted"] == "2", "Make sure you insert newly collected beans"
assert metrics["operationMetrics"]["numTargetRowsDeleted"] == "0", "No rows should be deleted by this operation"

In [0]:
last_tx.display()

In [0]:
teste = last_tx.select("operationMetrics").collect()[1]

In [0]:
teste["operationMetrics"]["numOutputRows"]


In [0]:
last_tx.select("operationMetrics").display()

# Dropping a Table

Assuming that you have proper permissions on the target table, you can permanently delete data in the lakehouse using a `DROP TABLE` command.

**NOTE:** Later in the course, we'll discuss Table Access Control Lists (ACLs) and default permissions. In a properly configured lakehouse, users should **not** be able to delete production tables.

```sql
DROP TABLE students

```

Run the following cell to delete the tables and files associated with this lesson.

```python
%python
DA.cleanup()

```

---

Gostaria que eu explicasse algum desses comandos SQL ou a lógica por trás do `MERGE`?

---

## Deleting Records

As deleções também são atômicas, portanto, não há risco de sucesso parcial ao remover dados do seu *data lakehouse*.

Uma instrução `DELETE` pode remover um ou muitos registros, mas sempre resultará em uma única transação.

**Cmd 22**

```sql
DELETE FROM students
WHERE value > 6

```

---

## Using Merge

Alguns sistemas SQL possuem o conceito de *upsert*, que permite que atualizações, inserções e outras manipulações de dados sejam executadas como um único comando.

O Databricks usa a palavra-chave `MERGE` para realizar essa operação.

Considere a seguinte *temporary view*, que contém 4 registros que podem ser a saída de um feed de *Change Data Capture* (CDC).

**Cmd 24**

```sql
CREATE OR REPLACE TEMP VIEW updates(id, name, value, type) AS VALUES

```

**Cmd 25**
Usando a sintaxe que vimos até agora, poderíamos filtrar a partir desta *view* por tipo para escrever 3 instruções, uma para cada ação: inserir, atualizar e deletar registros. No entanto, isso resultaria em 3 transações separadas; se qualquer uma dessas transações falhasse, poderia deixar nossos dados em um estado inválido.

Em vez disso, combinamos essas ações em uma única transação atômica, aplicando os 3 tipos de mudanças juntos.

As instruções `MERGE` devem ter pelo menos um campo para correspondência, e cada cláusula `WHEN MATCHED` ou `WHEN NOT MATCHED` pode ter qualquer número de instruções condicionais adicionais.

Aqui, fazemos a correspondência pelo campo `id` e depois filtramos pelo campo `type` para atualizar, deletar ou inserir nossos registros adequadamente.

**Cmd 26**

```sql
MERGE INTO students b
USING updates u
ON b.id=u.id
WHEN MATCHED AND u.type = "update"
  THEN UPDATE SET *
WHEN MATCHED AND u.type = "delete"
  THEN DELETE
WHEN NOT MATCHED AND u.type = "insert"
  THEN INSERT *

```

**Cmd 27**
Observe que apenas 3 registros foram impactados por nossa instrução `MERGE`; um dos registros em nossa tabela de atualizações não tinha um `id` correspondente na tabela `students`, mas estava marcado como `update`. Com base em nossa lógica personalizada, ignoramos esse registro em vez de inseri-lo.

Como você modificaria a instrução acima para incluir registros não correspondentes marcados como `update` na cláusula `INSERT` final?

---

## Dropping a Table

Assumindo que você tenha as permissões adequadas na tabela de destino, você pode excluir permanentemente os dados no *lakehouse* usando um comando `DROP TABLE`.

**NOTA:** Mais adiante no curso, discutiremos as Listas de Controle de Acesso (ACLs) de tabelas e permissões padrão. Em um *lakehouse* configurado corretamente, os usuários **não** devem ser capazes de excluir tabelas de produção.

**Cmd 29**

```sql
DROP TABLE students

```

**Cmd 30**
Execute a célula seguinte para excluir as tabelas e arquivos associados a esta lição.

**Cmd 31**

```python
%python
DA.cleanup()

```

---

Deseja que eu elabore uma explicação sobre como a lógica do `MERGE` garante a atomicidade mencionada no texto?