#####O que é Delta Lake?

In [0]:
# Resumo de benefícios do Delta Lake 
"""
Delta Lake : é a camada de armazenamento otimizada que fornece a base para armazenar dados e tabelas na Plataforma Databricks Lakehouse

Lakehouse  : é o ambiente onde fica armazenado arquivos\tabelas  no formato Delta

Diferença básica de Delta lake e D'ata lake :  o que acontece quando apaga dados dos 2 ambientes ? voce sabe ?

Transações ACID:significa atomicidade, consistência, isolamento e durabilidade.
Atomicidade: significa que todas as transações têm êxito ou falham completamente.
Consistência: estão relacionadas a como um determinado estado dos dados é observado por operações simultâneas.
Isolamento: se refere a como as operações simultâneas podem entrar em conflito entre si.
Durabilidade: significa que as alterações confirmadas são permanentes.


Benefícios:
Armazenamento otimizado: O Delta Lake é o software de código aberto que estende arquivos de dados Parquet com log e extramamente eficiente e compactado

Segurança e controle do fluxo de dados e reculperação se nescessário 

Versionamento dos dados: 
Cada gravação em uma tabela Delta cria uma versão da tabela. Você pode usar o log de transações para examinar modificações em sua tabela e consultar versões anteriores Você pode recuperar informações de operações, usuário, carimbo de data/hora e assim por diante relativas a cada gravação em uma tabela do Delta por meio do comando history. As operações são retornadas em ordem cronológica inversa.
A retenção do histórico de tabelas é determinada pela configuração da tabela delta.logRetentionDuration, que é de 30 dias por padrão.

Exemplo de data lake
https://www.databricks.com/br/glossary/data-lakehouse



"""

In [0]:
#Salvando em tabela Delta simples para verificarmos os logs de transações 
spark.read.json("/Volumes/workspace/default/arquivos-cursoudemy/Anac/V_OCORRENCIA_AMPLA.json") \
    .write.format("delta")\
    .mode("overwrite")\
    .saveAsTable("anac")


In [0]:
%sql
select * from anac

In [0]:
%sql 
delete from anac
where Aerodromo_de_Origem is null

In [0]:
%sql 
-- testando o delet
select * from anac
where Aerodromo_de_Origem is null


#####Versionamento, verificando e recuperando dados 

Obs : A retenção do histórico de tabelas é determinada pela configuração da tabela delta.logRetentionDuration, que é de 30 dias por padrão

In [0]:
%sql
-- Ver logs transacionais
DESCRIBE HISTORY  anac 

In [0]:
%sql
-- Ver logs transacionais  pelo caminho hive
DESCRIBE DETAIL anac

In [0]:
%sql 
--Vendo versao de log na prática 
select * from anac VERSION AS OF 0

In [0]:
%sql
--- restaurando uma versao (fazer consulta antes de restaurar para ver funcionando na prática )
select * from anac where Aerodromo_de_Origem is null

 

In [0]:
%sql
RESTORE TABLE anac TO VERSION AS OF 0

In [0]:
%sql
DESCRIBE HISTORY anac

####Otimização de dados (Partição)

In [0]:
#Salvando em modo particionado 
#Obs: particionar o que realmente for nescessárrio para otimizar a consulta
spark.read.json("/Volumes/workspace/default/arquivos-cursoudemy/Anac/V_OCORRENCIA_AMPLA.json") \
    .write.format("delta")\
    .partitionBy("UF") \
    .mode("overwrite")\
    .saveAsTable("anac_particionado")

In [0]:
%sql
select * from anac_particionado

In [0]:
%sql
select * from anac_particionado
where UF = 'MG'

In [0]:
%sql 
delete from  anac_particionado
where Aerodromo_de_Destino = 'SBMK' 

In [0]:
%sql
DESCRIBE HISTORY anac_particionado

####Otimizando Consulta 

In [0]:
spark.read.json("/Volumes/workspace/default/arquivos-cursoudemy/Anac/V_OCORRENCIA_AMPLA.json") \
    .write.mode("overwrite")\
    .saveAsTable("anac_normal")

In [0]:
%sql
select
replace(Aerodromo_de_Origem,"Sem Origem") as Origem,
Fase_da_Operacao,
Danos_a_Aeronave,
Municipio,
UF as Estado,
Operacao,
Regiao,
count(Matricula)  as quantidade
 from anac_normal
 group by 1,2,3,4,5,6,7
 having Origem is not null
 and Danos_a_Aeronave = 'Nenhum'

 order by 1,2

#####Formato Otimizado

In [0]:
#salvando no formato delta 
spark.read.json("/Volumes/workspace/default/arquivos-cursoudemy/Anac/V_OCORRENCIA_AMPLA.json") \
    .write.format("delta") \
    .partitionBy("Danos_a_Aeronave") \
    .mode("overwrite") \
    .saveAsTable("anac_delta_novo")


In [0]:
"""
OPTIMIZE nometbela  
ZORDER BY (nomecolunaordernar) obs : cria tipo um indice  nas colunas   pode ser mais de 1 (tudo tem que testar)
"""'

In [0]:
"""

se der erro (IllegalArgumentException: Danos_a_Aeronave is a partition column. Z-Ordering can only be performed on data columns) quer dizer que a otimização Z-Ordering que você está tentando aplicar não será eficaz porque as estatísticas não estão sendo coletadas para as colunas especificadas ou então esta tentando fazer pela partição (no nosso exemplo foi criado a partição Danos_a_Aeronave nao posso fazer ZORDER BY por ela  )
%sql set spark.databricks.delta.optimize.zorder.checkStatsCollection.enabled = False;

"""

In [0]:
%sql
select
replace(Aerodromo_de_Origem,"Sem Origem") as Origem,
Fase_da_Operacao,
Danos_a_Aeronave,
Municipio,
UF as Estado,
Operacao,
Regiao,
count(Matricula)  as quantidade
 from anac_delta_novo
 group by 1,2,3,4,5,6,7
 having Origem is not null
 and Danos_a_Aeronave = 'Nenhum'

 order by 1,2