### Bloco 1 - Configuração do Spark com suporte ao Delta lake

In [None]:
import pyspark
from delta import *

In [None]:
builder = pyspark.sql.SparkSession.builder \
    .appName("TestePraticoJordan") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

### Bloco 2 - Leitura do arquivo

In [None]:
PATH = 'work'
FILENAME = 'financial_data.csv'

In [None]:
df = spark.read.format('csv').option('header', True).load(f'{PATH}/{FILENAME}')

### Bloco 3 - Convertendo coluna "valor" para double 

In [None]:
df = df.withColumn('valor', col('valor').cast('double'))

### Bloco 4 - Filtrando transações com valores acima de 1000

In [None]:
df = df.filter(col('valor') > 1000)

### Bloco 5 - Salvando dados particionados por data transação em Delta Lake

In [None]:
OUTPUT_PATH = '/home/jovyan/work/delta/financial_data'

In [None]:
df.write \
    .format('delta') \
    .partitionBy('data_transacao') \
    .save(OUTPUT_PATH)

### Bloco 6 - Criando delta table a partir dos dados salvos

In [None]:
INPUT_PATH = OUTPUT_PATH

In [None]:
financial_data = DeltaTable.forPath(spark, INPUT_PATH)

### Bloco 7 - Consulta de transações com valor acima de 2000

In [None]:
df_financial_data = financial_data.toDF()
df_financial_data.filter(col('valor') > 2000).show()

### Bloco 8 - Exibindo histórico de versões da delta table

In [None]:
financial_data.history().show()

### Bloco 9 - Adicionando dados

In [None]:
new_data = [
    {'id_transacao': 11, 'data_transacao': '2024-07-08', 'valor': 3000.95, 'tipo_transacao': 'DEPOSITO', 'descricao': 'Bônus Anual'},
    {'id_transacao': 12, 'data_transacao': '2024-07-08', 'valor': -150.40, 'tipo_transacao': 'PAGAMENTO', 'descricao': 'Restaurante'},
    {'id_transacao': 13, 'data_transacao': '2024-07-09', 'valor': 800.75, 'tipo_transacao': 'TRANSFERENCIA', 'descricao': 'Transferência da Conta Corrente'},
    {'id_transacao': 14, 'data_transacao': '2024-07-09', 'valor': -200.22, 'tipo_transacao': 'PAGAMENTO', 'descricao': 'Cinema'},
]

df_new_data = spark.createDataFrame(new_data)

financial_data.alias('old') \
    .merge(
        df_new_data.alias('new'),
        'old.id_transacao = new.id_transacao'
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

df_financial_data = financial_data.toDF()

### Bloco 10 - Realizando consultas selecionando transações com valor superior a 1000

In [None]:
df_financial_data.filter(col('valor') > 1000).show()

### Bloco 11 - Agrupando transações por tipo, somando valores

In [None]:
df_financial_data \
    .groupBy(col('tipo_transacao')) \
    .agg(format_number(sum('valor'), 3).alias('valor_total')) \
    .select('tipo_transacao', 'valor_total') \
    .show()