# Caso de uso: substituição de dados a partir de uma data

Nesse caso devemos atualizar dados a partir de uma fonte que pode ter seus registros eliminados fisicamente, não há qualquer 
sinalização quando um registro é eliminado.
Entretanto temos uma data de referencia (DT_REFERENCIA) e sabemos que o dados não muda ou é eliminado x dias atras.
A carga é desacoplada, tratada em duas etapas.
 
## Requisitos:

### Primeira Etapa (2raw) - não mostrada aqui

- Carga de dados desacoplada, na primeira etapa seleciona os dados dos ultimos n dias a partir de uma data (DT_REFERENCIA) e gravar em um arquivo.
- Os arquivos devem ser guardados de forma cronológica, particionados pela data de ingestão (DateIngest) seus nomes devem indicar sua cronologia
- Exemplo: Carga_xyz_yyyyMMDD_hhmmss.snappy.parquet

### Segunda Etapa (raw2cleansed) - Nesse exemplo

- Pegar tudo que foi carregado desde a ultima execução (a partir de DateIngest), aqui mostramos uma forma simplista
- Garantimos que em cada arquivo entregue pela ingestão é completo, 
  os dados de uma data remessa presente no arquivo mais novo é o que conta, os demais devem ser descartados.
 


In [None]:
import pytz, itertools, unicodedata, json, calendar
from datetime import datetime, timezone, timedelta, date
from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
#from functools import reduce

In [None]:
# Obtem o dado da camada de ingestão

last_exec_date = date.today() # Só um exemplo, Usar para isso um sistema de controle de mudança de data
key_fields = ["NF", "SERIE_NF"]

my_sourch_path = "/path/to/source"

df = spark.read \
      .csv(my_sourch_path, header=True) \
      .withColumn('SourceFileName', input_file_name())\
      .withColumn('ANO_REFERENCIA', substring("DT_REFERENCIA", 1,4)) \
      .filter(col("DateIngest") >= last_exec_date)

In [None]:
# para cada data de referencia elimina duplicidades pela pelo nome do arquivo,
# assim ficamos apenas com o dado do ultimo arquivo.
# O nome do arquivo pode ser considerado como ordem cronológica.
# dbfs:/mnt/sadatalakevcbr/raw/Engemix/Command/DadosCPS/YearIngest=2021/MonthIngest=09/DateIngest=2021-09-24/DadosCPS_UTC2021-09-24%20040023__2021-08-10_2021-09-24.csv|2021-09-24 04:00:24 

#df.groupBy(['SOURCEFILENAME', "TS_UTC_ADF_INGESTION"]).count().orderBy(['SOURCEFILENAME', "TS_UTC_ADF_INGESTION"]).show(1000, False)

w = Window.partitionBy('DT_REFERENCIA')
df = df.withColumn('maxSourceFileName', max('SourceFileName').over(w)) \
       .where(col('maxSourceFileName') == col('SourceFileName')) \
       .drop('maxSourceFileName')


In [None]:
# Remove duplicidades pelos campos chave levando em consideração algum outro campo
window = Window.partitionBy(key_fields).orderBy(desc(col('DATA_ULT_ATUALIZACAO')))
df = df.withColumn("rank_temp", row_number().over(window)).filter(col("rank_temp") == 1).drop("rank_temp") 

In [None]:
# Checagem:::: Busca Duplicidades, não deve mostrar nenhum registro

#insert_order_fields = "TS_UTC_ADF_INGESTION" # (yyyy-MM-dd HH:mm:ss)
#window = Window.partitionBy(key_fields).orderBy([desc(order_field) for order_field in insert_order_fields])
window = Window.partitionBy(key_fields)
df.withColumn("num_temp", lit(1))\
  .withColumn("quant_linhas", sum(col("num_temp")).over(window))\
  .drop("num_temp")\
  .filter(col("quant_linhas") > 1)\
  .show()


In [None]:
# Criacao da lista de novas datas a serem inseridas
lista_data_remessa_excluir = [x.DT_REFERENCIA for x in df.select(col("DATA_REMESSA").cast("string")).distinct().orderBy("DATA_REMESSA").collect()]
lista_data_remessa_excluir = ','.join([f"'{x}'" for x in lista_data_remessa_excluir]) # Ex: datas = '"2020-01-01", "2020-01-02"'
print(lista_data_remessa_excluir)

In [None]:
# Grava os dados no camada Cleansed
# antes elimina dados no destino que estejam na lista de datas que vamos inserir. 
# Com isso resolvemos o problema de registros excluídos na origem sendo mantidos no Data Lake
# Vamos exluir e carregar novamente

# obtem o path do destino
path_dados_cps_cleansed = ''
print(path_dados_cps_cleansed)

if DeltaTable.isDeltaTable(spark, path_dados_cps_cleansed):
  print ("Atualizando delta table, passo 1/2: Eliminar dados antigos")
  delta_table = DeltaTable.forPath(spark, path_dados_cps_cleansed)
  delta_table.delete("DT_REFERENCIA in ({0})".format(lista_data_remessa_excluir))
  print ("Atualizando delta table, passo 2/2: Gravar dados novos")
  df.write.format("delta").mode("append").partitionBy('ANO_REFERENCIA').save(path_dados_cps_cleansed)
else:
  print ("Criando delta table")
  df.write.format("delta").partitionBy('ANO_REFERENCIA').save(path_dados_cps_cleansed)
