In [1]:
import pyspark
from delta import *
from delta.tables import DeltaTable


# Se você precisar adicionar ou alterar partições, considere métodos como merge ou outras operações.


builder = pyspark.sql.SparkSession.builder.appName("Teste Prático")\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\

spark = configure_spark_with_delta_pip(builder).getOrCreate()
    

In [2]:

# Carregar o arquivo CSV
df = spark.read.csv('/home/jovyan/work/financial_data.csv', header=True, inferSchema=True)
df.show()

+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|           2|    2024-07-02|-200.32|         SAQUE|        Saque em ATM|
|           3|    2024-07-03| -50.67|     PAGAMENTO|           Cafeteria|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|           5|    2024-07-04|-100.25|     PAGAMENTO|        Supermercado|
|           6|    2024-07-04| -30.12|     PAGAMENTO|  Transporte Público|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|           8|    2024-07-06|-300.55|         SAQUE|        Saque em ATM|
|           9|    2024-07-07|  -20.3|     PAGAMENTO|   Assinatura Online|
|          10|    2024-07-07|  500.4|      DEPOSITO|Retorno de Invest...|
|          11|    2024-07-08|3200.99| 

In [3]:
from pyspark.sql.functions import col

# Converter a coluna ‘valor’ para o tipo de dados double
df = df.withColumn("valor", col("valor").cast("double"))
df.printSchema()

root
 |-- id_transacao: integer (nullable = true)
 |-- data_transacao: date (nullable = true)
 |-- valor: double (nullable = true)
 |-- tipo_transacao: string (nullable = true)
 |-- descricao: string (nullable = true)



In [4]:
# Filtrar as transações com valor acima de 1000
df_filtered = df.filter(col("valor") > 1000)
df_filtered.show()


+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          11|    2024-07-08|3200.99|      DEPOSITO|             Salário|
|          15|    2024-07-12|1800.75| TRANSFERENCIA|Transferência de ...|
|          20|    2024-07-17|1000.65|      DEPOSITO|               Bônus|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          30|    2024-07-27|2750.75|      DEPOSITO|             Salário|
|          34|    2024-08-01|1600.95|      DEPOSITO| Depósito de Salário|
|          37|    2024-08-03| 2150.6| 

In [5]:
# Salvar os dados filtrados em um Delta Lake, particionados pela data da transação
df_filtered.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("data_transacao") \
    .save("/home/jovyan/work/delta/financial_data")


In [7]:
from delta.tables import DeltaTable

# Especificando o caminho onde os dados Delta estão armazenados
delta_path = "/home/jovyan/work/delta/financial_data"

# Criando a Delta Table a partir dos dados salvos
spark.sql(f"""
    CREATE TABLE financial_data
    USING DELTA
    LOCATION '{delta_path}'
""")


DataFrame[]

In [9]:
# Realizar uma consulta para selecionar todas as transações com valor acima de 2000
df_high_value = spark.sql("SELECT * FROM financial_data WHERE valor > 2000")
df_high_value.show()


+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          37|    2024-08-03| 2150.6| TRANSFERENCIA|Transferência par...|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          40|    2024-08-05| 2650.4|      DEPOSITO| Pagamento Freelance|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          44|    2024-08-08|3300.75|      DEPOSITO|         Bônus Anual|
|          11|    2024-07-08|3200.99|      DEPOSITO|             Salário|
|          30|    2024-07-27|2750.75|      DEPOSITO|             Salário|
+------------+--------------+-------+--------------+--------------------+



In [10]:
# Exibir o histórico de versões da Delta Table
delta_table = DeltaTable.forPath(spark, "/home/jovyan/work/delta/financial_data")
delta_table.history().show()


+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2024-08-07 20:22:...|  NULL|    NULL|    WRITE|{mode -> Overwrit...|NULL|    NULL|     NULL|       NULL|  Serializable|        false|{numFiles -> 14, ...|        NULL|Apache-Spark/3.5....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+



In [14]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Definindo o esquema dos novos dados
schema = StructType([
    StructField("id_transacao", IntegerType(), True),
    StructField("data_transacao", StringType(), True),
    StructField("valor", DoubleType(), True),
    StructField("tipo_transacao", StringType(), True),
    StructField("descricao", StringType(), True)
])

# Criando um DataFrame com os novos dados
new_data = [
    (11, '2024-07-08', 3000.95, 'DEPOSITO', 'Bônus Anual'),
    (12, '2024-07-08', -150.40, 'PAGAMENTO', 'Restaurante'),
    (13, '2024-07-09', 800.75, 'TRANSFERENCIA', 'Transferência da Conta Corrente'),
    (14, '2024-07-09', -200.22, 'PAGAMENTO', 'Cinema')
]

new_df = spark.createDataFrame(new_data, schema)

# Convertendo a coluna 'data_transacao' para DateType
new_df = new_df.withColumn("data_transacao", to_date(col("data_transacao"), "yyyy-MM-dd"))


# Adicionando os novos dados à tabela Delta
new_df.write \
    .format("delta") \
    .mode("append") \
    .save(delta_path)

# Verificando se os dados foram adicionados corretamente
spark.sql(f"SELECT * FROM delta.`{delta_path}`").show()


+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|          13|    2024-07-09| 800.75| TRANSFERENCIA|Transferência da ...|
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          37|    2024-08-03| 2150.6| TRANSFERENCIA|Transferência par...|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          15|    2024-07-12|1800.75| TRANSFERENCIA|Transferência de ...|
|          34|    2024-08-01|1600.95|      DEPOSITO| Depósito de Salário|
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|          40|    2024-08-05| 2650.4|      DEPOSITO| Pagamento Freelance|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          48|    2024-08-10|1350.45| 

In [15]:
# Realizar uma consulta para selecionar todas as transações com valor acima de 1000
df_high_value_updated = spark.sql("SELECT * FROM financial_data WHERE valor > 1000")
df_high_value_updated.show()


+------------+--------------+-------+--------------+--------------------+
|id_transacao|data_transacao|  valor|tipo_transacao|           descricao|
+------------+--------------+-------+--------------+--------------------+
|          28|    2024-07-25| 2000.1| TRANSFERENCIA|Transferência de ...|
|          37|    2024-08-03| 2150.6| TRANSFERENCIA|Transferência par...|
|           4|    2024-07-03|2000.78| TRANSFERENCIA|Transferência par...|
|          22|    2024-07-19| 2200.8| TRANSFERENCIA|Transferência par...|
|          15|    2024-07-12|1800.75| TRANSFERENCIA|Transferência de ...|
|          34|    2024-08-01|1600.95|      DEPOSITO| Depósito de Salário|
|           1|    2024-07-01|1500.45|      DEPOSITO| Depósito de Salário|
|          40|    2024-08-05| 2650.4|      DEPOSITO| Pagamento Freelance|
|           7|    2024-07-05|2500.89|      DEPOSITO| Pagamento Freelance|
|          48|    2024-08-10|1350.45|      DEPOSITO|    Reembolso Médico|
|          11|    2024-07-08|3000.95| 

In [17]:
# Carregando a tabela Delta
df = spark.read.format("delta").load(delta_path)

# Registrando o DataFrame como uma tabela temporária para consulta SQL
df.createOrReplaceTempView("financial_data")

# Executando a consulta SQL para agrupar as transações por tipo e somar os valores
result_df = spark.sql("""
    SELECT tipo_transacao, SUM(valor) as total_valor
    FROM financial_data
    GROUP BY tipo_transacao
""")

# Mostrando o resultado
result_df.show()

# Parando a sessão Spark
spark.stop()

+--------------+------------------+
|tipo_transacao|       total_valor|
+--------------+------------------+
| TRANSFERENCIA|10953.779999999999|
|      DEPOSITO|22857.230000000003|
|     PAGAMENTO|           -350.62|
+--------------+------------------+

