## Instalação do delta

In [1]:
pip install delta-spark

Note: you may need to restart the kernel to use updated packages.


In [29]:
# Restart Kernel
from IPython import get_ipython
 
if get_ipython():
    get_ipython().kernel.do_shutdown(restart=True)

## Início do desenvolvimento

In [10]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType
from delta import *
from delta.tables import DeltaTable
import datetime

In [2]:
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
source_file_path = "/home/jovyan/work/financial_data.csv"
target_table_path = "/home/jovyan/work/delta/financial_data"
target_table_name = "financial_data"

In [4]:
'''
Carregue o arquivo financial_data.csv localizado na pasta work.
'''

df_financial_data = spark.read.options(delimiter=",", header=True).csv(source_file_path)

df_financial_data.show(100)
df_financial_data.printSchema()

+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|           2|    2024-07-02|-200.32|         SAQUE|        Saque em ATM|
|           3|    2024-07-03| -50.67|     PAGAMENTO|           Cafeteria|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|           5|    2024-07-04|-100.25|     PAGAMENTO|        Supermercado|
|           6|    2024-07-04| -30.12|     PAGAMENTO|  Transporte Público|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|           8|    2024-07-06|-300.55|         SAQUE|        Saque em ATM|
|           9|    2024-07-07| -20.30|     PAGAMENTO|   Assinatura Online|
|          10|    2024-07-07| 500.40|      DEPOSITO|Retorno de Invest...|
|          11|    2024-07-08|3200.99| 

In [5]:
'''
. Converta a coluna "valor" para o tipo de dados double.
. Filtre as transações com valor acima de 1000.
'''

df_financial_data_treated = (
    df_financial_data
        .withColumn("valor", F.col("valor").cast("double"))
        .withColumn("id_transacao", F.col("id_transacao").cast("int"))  # formato ideal
        .withColumn("data_transacao", F.col("data_transacao").cast("date")) # formato ideal
        .filter("valor > 1000")
)

df_financial_data_treated.show()
print(f"count: {df_financial_data_treated.count()}")
df_financial_data_treated.printSchema()

+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          11|    2024-07-08|3200.99|      DEPOSITO|             Salário|
|          15|    2024-07-12|1800.75| TRANSFERENCIA|Transferência de ...|
|          20|    2024-07-17|1000.65|      DEPOSITO|               Bônus|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          30|    2024-07-27|2750.75|      DEPOSITO|             Salário|
|          34|    2024-08-01|1600.95|      DEPOSITO| Depósito de Salário|
|          37|    2024-08-03| 2150.6| 

In [6]:
'''
. Salve os dados filtrados em um Delta Lake, particionados pela data da transação, no diretório /home/jovyan/work/delta/financial_data
. Crie uma delta table chamada financial_data a partir dos dados salvos.
'''

df_financial_data_treated.write.format("delta").mode("overwrite").partitionBy("data_transacao").save(target_table_path)
spark.sql(f"CREATE TABLE IF NOT EXISTS {target_table_name} USING DELTA LOCATION '{target_table_path}'")

## ou
# df_financial_data_treated.write.format("delta").mode("overwrite").partitionBy("data_transacao").option("path", target_table_path).saveAsTable(target_table_name)

# print
spark.sql(f"SELECT * FROM {target_table_name}").show()

+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          37|    2024-08-03| 2150.6| TRANSFERENCIA|Transferência par...|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          15|    2024-07-12|1800.75| TRANSFERENCIA|Transferência de ...|
|          34|    2024-08-01|1600.95|      DEPOSITO| Depósito de Salário|
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|          40|    2024-08-05| 2650.4|      DEPOSITO| Pagamento Freelance|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          48|    2024-08-10|1350.45|      DEPOSITO|    Reembolso Médico|
|          44|    2024-08-08|3300.75| 

In [7]:
'''
Realize uma consulta para selecionar todas as transações com valor acima de 2000.
'''

spark.sql(f"SELECT * FROM {target_table_name} WHERE valor > 2000").show()

# ou
# df_delta_financial_data = spark.read.table(target_table_name)
# df_delta_financial_data.filter("valor > 2000").show()

+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          37|    2024-08-03| 2150.6| TRANSFERENCIA|Transferência par...|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          40|    2024-08-05| 2650.4|      DEPOSITO| Pagamento Freelance|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          44|    2024-08-08|3300.75|      DEPOSITO|         Bônus Anual|
|          11|    2024-07-08|3200.99|      DEPOSITO|             Salário|
|          30|    2024-07-27|2750.75|      DEPOSITO|             Salário|
+------------+--------------+-------+--------------+--------------------+



In [8]:
'''
Exiba o histórico de versões da delta table.
'''

spark.sql(f"DESCRIBE HISTORY {target_table_name}").show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2024-08-01 21:05:...|  NULL|    NULL|    WRITE|{mode -> Overwrit...|NULL|    NULL|     NULL|       NULL|  Serializable|        false|{numFiles -> 14, ...|        NULL|Apache-Spark/3.5....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+



In [11]:
'''
Adicione os seguintes novos dados à tabela Delta:

- id_transacao: 11, data_transacao: '2024-07-08', valor: 3000.95, tipo_transacao: 'DEPOSITO', descricao: 'Bônus Anual'
- id_transacao: 12, data_transacao: '2024-07-08', valor: -150.40, tipo_transacao: 'PAGAMENTO', descricao: 'Restaurante'
- id_transacao: 13, data_transacao: '2024-07-09', valor: 800.75, tipo_transacao: 'TRANSFERENCIA', descricao: 'Transferência da Conta Corrente'
- id_transacao: 14, data_transacao: '2024-07-09', valor: -200.22, tipo_transacao: 'PAGAMENTO', descricao: 'Cinema'
'''

data = [
    (11, datetime.datetime.strptime('2024-07-08', "%Y-%m-%d").date(), 3000.95, 'DEPOSITO', 'Bônus Anual'),
    (12, datetime.datetime.strptime('2024-07-08', "%Y-%m-%d").date(), -150.40, 'PAGAMENTO', 'Restaurante'),
    (13, datetime.datetime.strptime('2024-07-09', "%Y-%m-%d").date(), 800.75, 'TRANSFERENCIA', 'Transferência da Conta Corrente'),
    (14, datetime.datetime.strptime('2024-07-09', "%Y-%m-%d").date(), -200.22, 'PAGAMENTO', 'Cinema'),
    (15, datetime.datetime.strptime('2024-07-09', "%Y-%m-%d").date(), -200.22, 'PAGAMENTO', 'Cinema'),
    (992, datetime.datetime.strptime('2024-07-09', "%Y-%m-%d").date(), -200.22, 'PAGAMENTO', 'Cinema'),
    (993, datetime.datetime.strptime('2024-07-09', "%Y-%m-%d").date(), 200.22, 'PAGAMENTO', 'Cinema')
]
schema = StructType([
    StructField("id_transacao", IntegerType(), True),
    StructField("data_transacao", DateType(), True),
    StructField("valor", DoubleType(), True),
    StructField("tipo_transacao", StringType(), True),
    StructField("descricao", StringType(), True)
])

df_new_data = spark.createDataFrame(data, schema)


''' 
    De acordo com a descrição "Adicione os seguintes novos dados à tabela Delta" a palavra 
    "Adicione" tem muitos significados, mas pelo meu entendimento é mais próxima da palavra "Acrescentar",
    ou seja, "inserir". Por isso eu priorizei o "append". Contudo, como tem um "id_transacao", 
    eu posso entender também que o objetivo seria mesclar os dados. Sendo assim, eu montei também
    o código mais abaixo que está comentado.
'''

## Se for insert... 
df_new_data.write.format("delta").mode("append").saveAsTable(target_table_name)

## Se for merge...
# delta_table_financial_data = DeltaTable.forPath(spark, target_table_path)
# (
#     delta_table_financial_data.alias("old")
#         .merge(
#             df_new_data.alias("new"),
#             "old.id_transacao = new.id_transacao"
#         )
#         .whenMatchedUpdateAll()
#         .whenNotMatchedInsertAll()
#         .execute()
# )

In [12]:
'''
Realize uma consulta para selecionar todas as transações com valor acima de 1000.
'''

spark.sql(f"""
    SELECT * 
    FROM {target_table_name} 
    WHERE valor > 1000
""").show()

+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          37|    2024-08-03| 2150.6| TRANSFERENCIA|Transferência par...|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          15|    2024-07-12|1800.75| TRANSFERENCIA|Transferência de ...|
|          34|    2024-08-01|1600.95|      DEPOSITO| Depósito de Salário|
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|          40|    2024-08-05| 2650.4|      DEPOSITO| Pagamento Freelance|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          48|    2024-08-10|1350.45|      DEPOSITO|    Reembolso Médico|
|          11|    2024-07-08|3000.95| 

In [13]:
'''
Agrupe as transações por tipo e some os valores.
'''

spark.sql(f"""
    SELECT tipo_transacao, ROUND(SUM(valor), 2) valor 
    FROM {target_table_name} 
    GROUP BY tipo_transacao
""").show()

+--------------+--------+
|tipo_transacao|   valor|
+--------------+--------+
| TRANSFERENCIA|10953.78|
|      DEPOSITO|22857.23|
|     PAGAMENTO| -550.84|
+--------------+--------+

