#######Primeiro Script , trabalhando com carga de dados e ajuste de registros no Delta Lake
##Carrega os dados do arquivo Json

In [0]:
%python
dfcliente = spark.read.json("/FileStore/tables/cliente/clientes.json");
dfcliente.printSchema()
dfcliente.show()

root
 |-- customer: string (nullable = true)
 |-- date_order: string (nullable = true)
 |-- id: long (nullable = true)
 |-- price: double (nullable = true)
 |-- product: string (nullable = true)
 |-- unit: long (nullable = true)

+--------+----------+---+-----+---------------+----+
|customer|date_order| id|price|        product|unit|
+--------+----------+---+-----+---------------+----+
|  Carlos|2021-01-23|  1|238.0|             TV|   5|
|     Ana|2021-03-22|  2|121.6|AR-CONDICIONADO|   6|
|   Sofia|2021-04-21|  3|415.4|        FREEZER|   7|
|  Sandra|2021-04-23|  4|313.0|             TV|   8|
|  Tereza|2021-05-23|  5|412.0|       FRIGOBAR|   3|
|   Carla|2021-06-25|  6|124.0|           MESA|   1|
|   Sofia|2021-07-25|  7|342.3|        CADEIRA|   1|
+--------+----------+---+-----+---------------+----+



##Cria a tabela temporária com os dos do arquivo json em memória

In [0]:
%python
dfcliente.createOrReplaceTempView("compras_view");
saida =spark.sql("SELECT * FROM compras_view")
saida.show()

+--------+----------+---+-----+---------------+----+
|customer|date_order| id|price|        product|unit|
+--------+----------+---+-----+---------------+----+
|  Carlos|2021-01-23|  1|238.0|             TV|   5|
|     Ana|2021-03-22|  2|121.6|AR-CONDICIONADO|   6|
|   Sofia|2021-04-21|  3|415.4|        FREEZER|   7|
|  Sandra|2021-04-23|  4|313.0|             TV|   8|
|  Tereza|2021-05-23|  5|412.0|       FRIGOBAR|   3|
|   Carla|2021-06-25|  6|124.0|           MESA|   1|
|   Sofia|2021-07-25|  7|342.3|        CADEIRA|   1|
+--------+----------+---+-----+---------------+----+



##Carrega os dados no Delta Lake gerando uma tabela chamada compras, note USING DELTA

In [0]:
%sql
DROP TABLE if exists compras;

In [0]:
%fs rm -r /user/hive/warehouse/compras

In [0]:
%scala
val scrisql = "CREATE OR REPLACE TABLE compras (id STRING, date_order STRING,customer STRING,product STRING,unit INTEGER,price DOUBLE) USING DELTA PARTITIONED BY (date_order) ";

spark.sql(scrisql);

##Lista os dados do Delta Lake, que estará vazia

In [0]:
%scala
spark.sql("select * from compras").show()

##Criando um merge para carregar os dados da tabela temporário no Delta Lake

In [0]:
%scala
val mergedados = "Merge into compras " +
"using compras_view as cmp_view " +
"ON compras.id = cmp_view.id " +
"WHEN MATCHED THEN " +
"UPDATE SET compras.product = cmp_view.product," +
"compras.price = cmp_view.price " +
"WHEN NOT MATCHED THEN INSERT * ";

spark.sql(mergedados);

## Exibe os dados que foram carregados com o merge

In [0]:
%scala
spark.sql("select * from compras").show();

##Atualiza os dados do id=4 com o comando update

In [0]:
%scala
val atualiza_dados = "update compras " +
"set product = 'Geladeira' " +
"where id =4";
spark.sql(atualiza_dados);

## Exibe os dados que foram carregados, note a atualização no id=4

In [0]:
%scala
spark.sql("select * from compras").show();

## Eliminação do registro cujo o id=1

In [0]:
%scala
val deletaregistro = "delete from compras where id = 1";
spark.sql(deletaregistro);

## Exibe os dados que foram carregados

In [0]:
%scala
spark.sql("select * from compras").show();