In [None]:
import pandas as pd
import numpy as np
import plotly.express as px
from src.data.data_processed.data_exploration import  executar_pipeline
from pyspark.sql.window import Window
from pyspark.sql.functions import col
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
import psutil
from src.visualizations.plot_btc import plot_transaction_fee_time_series, plot_btc_total_hourly, plot_btc_moved_hourly, plot_transaction_distribution, plot_btc_blocks_per_day


In [None]:
num_cores = psutil.cpu_count(logical=False)  
# --> Obtém o número de núcleos físicos do processador / Gets the number of physical CPU cores <--

NUM_PARTITIONS = max(num_cores * 2, 16)  
# --> Ajusta o número de partições para o dobro dos núcleos físicos ou no mínimo 16, dependendo do maior valor / Adjusts the number of partitions to double the physical cores or at least 16, depending on which is greater <--

print(f"Número de partições ajustado para: {NUM_PARTITIONS}")
# --> Exibe o número de partições ajustado / Displays the adjusted number of partitions <--

In [None]:
spark = SparkSession.builder \
    .appName("Corrigir_Parquet") \
    .config("spark.sql.shuffle.partitions", str(NUM_PARTITIONS)) \
    .config("spark.default.parallelism", "6") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.memory.fraction", "0.80") \
    .config("spark.memory.storageFraction", "0.5") \
    .config("spark.sql.files.maxPartitionBytes", "128MB") \
    .config("spark.cleaner.referenceTracking.cleanCheckpoints", "false") \
    .config("spark.executor.heartbeatInterval", "60000ms") \
    .config("spark.task.cpus", "1") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print("\n SparkSession configurada com sucesso!")


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 63846)
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.13/3.13.2/Frameworks/Python.framework/Versions/3.13/lib/python3.13/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
    ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.2/Frameworks/Python.framework/Versions/3.13/lib/python3.13/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
    ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.2/Frameworks/Python.framework/Versions/3.13/lib/python3.13/socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
    ~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.2/Frameworks/Python.fra

In [None]:
await executar_pipeline()

In [None]:
PASTA_BLOCKS = "/Users/rodrigocampos/Library/Mobile Documents/com~apple~CloudDocs/requests/blockchain_blocks_part"
df_blocks = spark.read.parquet(PASTA_BLOCKS)


<img src="https://upload.wikimedia.org/wikipedia/commons/4/46/Bitcoin.svg" alt="Logo do Bitcoin" width="100" style="margin-right: 30px;">
<font size="15">₿locks</font>

root  
 |-- block_height: long  ------------------------> Representa a altura do bloco na blockchain, ou seja, o número sequencial de cada bloco na cadeia.  
 |-- timestamp: timestamp_ntz  ----------------> A coluna timestamp registra o momento exato em que um bloco foi minerado e adicionado à blockchain.  
 |-- block_hash: string  ------------------------> Representa o identificador único do bloco, gerado a partir do hash criptográfico do bloco atual, garantindo a integridade e segurança da blockchain.  
 |-- num_transactions: long  -------------------> Indica quantas transações foram incluídas em cada bloco.   
 |-- total_btc_moved: double  -----------------> Representa a quantidade total de Bitcoin transacionada dentro de um bloco, ou seja, a soma de todos os valores de entrada e saída das transações contidas nesse bloco.  
 |-- block_reward: double  --------------------> Representa a recompensa que o minerador recebe ao minerar um bloco.  
 |-- total_fees: double  ------------------------> Representa o total de taxas pagas pelos usuários para incluir transações em um bloco.   
 |-- block_value_btc: double  -----------------> Representa o valor total gerado em um bloco, ou seja, a soma da recompensa do bloco (block_reward) e das taxas de transação (total_fees). 



##### 🔶 ₿ -----> block_height

In [None]:
df_blocks.show(5)
df_blocks.printSchema()

In [None]:
# Ordenar os dados por block_height
df_blocks = df_blocks.orderBy("block_height")

In [None]:
menor_bloco = df_blocks.select("block_height").orderBy("block_height").first()[0]
# --> Seleciona o menor número de bloco presente no DataFrame / Selects the smallest block number in the DataFrame <--

maior_bloco = df_blocks.select("block_height").orderBy(F.desc("block_height")).first()[0]
# --> Seleciona o maior número de bloco presente no DataFrame / Selects the largest block number in the DataFrame <--

print(f"Menor bloco salvo: {menor_bloco}")
# --> Exibe o menor número de bloco salvo para fins de auditoria / Displays the lowest saved block number for auditing <--

print(f"Maior bloco salvo: {maior_bloco}")
# --> Exibe o maior número de bloco salvo para fins de auditoria / Displays the highest saved block number for auditing <--

In [None]:
windowSpec = Window.orderBy("block_height")
# --> Define uma janela ordenada pela altura do bloco para operações entre linhas consecutivas / Defines a window ordered by block height for row-to-row operations <--

df_blocks = df_blocks.withColumn("prev_block_height", F.lag("block_height").over(windowSpec))
# --> Adiciona uma nova coluna com a altura do bloco anterior utilizando a função lag() / Adds a new column with the previous block height using lag() function <--

Isso irá mostrar os blocos onde houve gaps:  

+-----------------+-------------+  
|prev_block_height|block_height|  
+-----------------+-------------+  
|-----875540-----|---875550---|   <-- Gap de 9 blocos  
|-----875560-----|---875570---|   <-- Gap de 9 blocos  
+-----------------+-------------+  

In [None]:
df_blocks_filtered = df_blocks.select("block_height", "prev_block_height")
# --> Seleciona apenas as colunas de altura atual e anterior dos blocos / Selects only current and previous block height columns <--

df_gaps = df_blocks_filtered.withColumn("gap_detected", (col("block_height") - col("prev_block_height")) > 1)
# --> Cria uma nova coluna indicando se há um intervalo (gap) entre blocos consecutivos / Creates a new column indicating if there's a gap between consecutive blocks <--

df_gaps_filtered = df_gaps.filter(col("gap_detected") == True)
# --> Filtra apenas os casos onde um intervalo foi detectado / Filters only rows where a gap was detected <--

df_gaps_filtered.select("prev_block_height", "block_height").show()
# --> Exibe os blocos onde há lacunas na sequência para auditoria / Displays blocks with gaps in the sequence for auditing <--

In [None]:
image_path = "/Users/rodrigocampos/Documents/Bitcoin/project/src/visualizations/BTC_black.png"

In [None]:
plot_btc_blocks_per_day(df_blocks, image_path)

##### 🔶 ₿ -----> timestamp

In [None]:
df_blocks.select(
    F.min("timestamp").alias("Min Timestamp"),
    F.max("timestamp").alias("Max Timestamp")
).show(truncate=False)
# --> Exibe o menor e o maior timestamp presentes no DataFrame para auditoria do intervalo temporal / Displays the minimum and maximum timestamps in the DataFrame for auditing the time range <--

In [None]:
df_blocks = df_blocks.orderBy("timestamp")
# --> Ordena o DataFrame pela coluna de timestamp em ordem crescente / Sorts the DataFrame by the timestamp column in ascending order <--

In [None]:
windowSpec = Window.orderBy("timestamp")
# --> Define uma janela ordenada por timestamp para acessar registros anteriores / Defines a window ordered by timestamp to access previous rows <--

df_blocks = df_blocks.withColumn("prev_timestamp", F.lag("timestamp").over(windowSpec))
# --> Adiciona uma nova coluna com o timestamp do bloco anterior usando função lag() / Adds a new column with the previous block's timestamp using lag() function <--

In [None]:
df_blocks = df_blocks.withColumn("time_diff", 
                                 F.unix_timestamp("timestamp") - F.unix_timestamp("prev_timestamp"))
# --> Cria uma nova coluna com a diferença de tempo (em segundos) entre blocos consecutivos / Creates a new column with time difference (in seconds) between consecutive blocks <--

Valores de time_diff que podem indicar anomalias:  
  
1.	Muito pequeno (< 60 segundos)  
2.	Acima de 900-1200 segundos (15-20 minutos)  
3.	Maior que 1800 segundos (30 minutos)  
4.	Extremamente alto (> 3600 segundos, ou seja, mais de 1 hora)  


In [None]:
df_blocks_filtered = df_blocks.select("prev_timestamp", "timestamp", "time_diff")
# --> Seleciona apenas as colunas relacionadas ao intervalo de tempo entre blocos / Selects only columns related to time gaps between blocks <--

df_large_gaps = df_blocks_filtered.filter(col("time_diff") > 1800)
# --> Filtra os casos com lacunas de tempo maiores que 30 minutos (1800 segundos) / Filters cases with time gaps greater than 30 minutes (1800 seconds) <--

df_small_gaps = df_blocks_filtered.filter(col("time_diff") < 60)
# --> Filtra os casos com intervalos de tempo menores que 1 minuto (60 segundos) / Filters cases with time gaps smaller than 1 minute (60 seconds) <--

df_gaps = df_large_gaps.union(df_small_gaps)
# --> Combina os casos de lacunas anômalas (muito grandes ou muito pequenas) / Combines anomalous gap cases (too large or too small) <--

df_gaps.show(10)
# --> Exibe os primeiros 10 registros de lacunas identificadas para auditoria / Displays first 10 identified gaps for auditing <--

In [None]:
windowSpec = Window.orderBy("timestamp")
# --> Define uma janela ordenada por timestamp para acessar registros anteriores / Defines a window ordered by timestamp to access previous rows <--

df_blocks = df_blocks.withColumn("prev_timestamp", F.lag("timestamp").over(windowSpec))
# --> Cria nova coluna com o timestamp do bloco anterior usando a função lag() / Adds a new column with the previous block's timestamp using lag() <--

df_blocks = df_blocks.withColumn("time_diff", F.unix_timestamp("timestamp") - F.unix_timestamp("prev_timestamp"))
# --> Calcula a diferença de tempo entre blocos consecutivos em segundos / Computes time difference between consecutive blocks in seconds <--

df_time_stats = df_blocks.select(
    F.avg("time_diff").alias("avg_time_between_blocks"),
    F.stddev("time_diff").alias("stddev_time_between_blocks"),
    F.min("time_diff").alias("min_time_between_blocks"),
    F.max("time_diff").alias("max_time_between_blocks")
).show()
# --> Calcula estatísticas descritivas do tempo entre blocos: média, desvio padrão, mínimo e máximo / Computes descriptive statistics of time between blocks: mean, stddev, min, and max <--

##### 🔶 ₿ -----> num_transactions

In [None]:
df_blocks.select(
    F.min("num_transactions").alias("min_transactions"),
    F.max("num_transactions").alias("max_transactions"),
    F.avg("num_transactions").alias("avg_transactions"),
    F.stddev("num_transactions").alias("stddev_transactions")
).show()
# --> Calcula estatísticas descritivas da quantidade de transações por bloco: mínimo, máximo, média e desvio padrão / Computes descriptive statistics for the number of transactions per block: min, max, mean, and standard deviation <--

In [None]:
df_empty_blocks = df_blocks.filter(F.col("num_transactions") == 0)
# --> Filtra os blocos que não contêm transações (blocos vazios) / Filters blocks with zero transactions (empty blocks) <--

df_empty_blocks.count()
# --> Conta quantos blocos vazios existem no DataFrame / Counts how many empty blocks exist in the DataFrame <--

In [None]:
df_blocks.groupBy("num_transactions").count().orderBy("num_transactions").show(5)
# --> Agrupa os blocos pela quantidade de transações, conta quantos blocos possuem cada valor e exibe os 5 menores / Groups blocks by number of transactions, counts how many blocks have each value, and shows the 5 lowest values <--

In [None]:
plot_transaction_distribution(df_blocks, image_path)

In [None]:
df_anomalies = df_blocks.filter((F.col("num_transactions") < 500) | (F.col("num_transactions") > 3000))
# --> Filtra os blocos com número anômalo de transações (muito baixo ou muito alto) / Filters blocks with anomalous number of transactions (too low or too high) <--

df_anomalies.select("block_height", "timestamp", "num_transactions").show(5)
# --> Exibe os primeiros 5 blocos anômalos com altura, timestamp e número de transações / Displays the first 5 anomalous blocks with height, timestamp, and number of transactions <--

##### 🔶 ₿ -----> total_btc_moved

In [None]:
df_blocks.select(
    F.min("total_btc_moved").alias("min_btc_moved"),
    F.max("total_btc_moved").alias("max_btc_moved"),
    F.avg("total_btc_moved").alias("avg_btc_moved"),
    F.stddev("total_btc_moved").alias("stddev_btc_moved")
).show()
# --> Calcula estatísticas descritivas sobre o total de BTC movimentado por bloco: mínimo, máximo, média e desvio padrão / Computes descriptive statistics for total BTC moved per block: min, max, mean, and standard deviation <--

In [None]:
df_zero_btc = df_blocks.filter(F.col("total_btc_moved") == 0)
# --> Filtra os blocos que não movimentaram nenhum BTC (valor total igual a zero) / Filters blocks with no BTC moved (total value equals zero) <--

df_zero_btc.count()
# --> Conta quantos blocos possuem valor total de BTC movimentado igual a zero / Counts how many blocks have zero total BTC moved <--

In [None]:
stats = df_blocks.select(
    F.avg("total_btc_moved").alias("mean"),
    F.stddev("total_btc_moved").alias("stddev")
).collect()[0]
# --> Calcula a média e o desvio padrão da quantidade total de BTC movimentado por bloco / Computes mean and standard deviation of total BTC moved per block <--

mean_btc_moved = stats["mean"]
stddev_btc_moved = stats["stddev"]
# --> Armazena os valores de média e desvio padrão para uso posterior / Stores the mean and standard deviation values for later use <--

df_anomalies = df_blocks.filter(col("total_btc_moved") > mean_btc_moved + 3 * stddev_btc_moved)
# --> Identifica blocos com valores anômalos (maiores que 3 desvios acima da média) / Identifies anomalous blocks (greater than 3 standard deviations above the mean) <--

df_anomalies.select("block_height", "timestamp", "total_btc_moved").show(5)
# --> Exibe os primeiros 5 blocos com movimentação anômala de BTC / Displays the first 5 blocks with anomalous BTC movement <--

In [None]:
df_btc_time = df_blocks.groupBy(F.date_format("timestamp", "yyyy-MM-dd").alias("date")) \
    .agg(F.avg("total_btc_moved").alias("avg_btc_moved")) \
    .orderBy("date")
# --> Agrupa os blocos por data e calcula a média diária de BTC movimentado / Groups blocks by date and calculates the daily average BTC moved <--

df_btc_time.show(5)
# --> Exibe as 5 primeiras datas com suas respectivas médias de BTC movimentado / Displays the first 5 dates with their average BTC moved <--

In [None]:
plot_btc_moved_hourly(df_blocks, image_path)

##### 🔶 ₿ -----> block_reward

In [None]:
df_no_reward = df_blocks.filter(F.col("block_reward") == 0)
# --> Filtra os blocos que não geraram recompensa (recompensa igual a zero) / Filters blocks with no reward (reward equal to zero) <--

df_no_reward.count()
# --> Conta quantos blocos não tiveram recompensa associada / Counts how many blocks had no associated reward <--

In [None]:
df_blocks = df_blocks.withColumn(
    "halving_period", 
    (F.col("block_height") / 210000).cast("int")
)
# --> Cria uma nova coluna indicando o período de halving com base na altura do bloco / Creates a new column indicating the halving period based on block height <--

df_blocks.groupBy("halving_period").agg(
    F.avg("block_reward").alias("avg_reward")
).orderBy("halving_period").show()
# --> Agrupa os blocos por período de halving e calcula a média da recompensa em cada um / Groups blocks by halving period and computes the average reward in each <--

In [None]:
valid_rewards = [50, 25, 12.5, 6.25, 3.125]
# --> Define os valores válidos esperados de recompensa por bloco com base nos eventos de halving do Bitcoin / Defines valid block reward values based on Bitcoin halving events <--

df_anomalies = df_blocks.filter(~F.col("block_reward").isin(valid_rewards))
# --> Filtra os blocos cuja recompensa não pertence à lista de valores esperados (possível anomalia) / Filters blocks whose reward is not in the list of expected values (potential anomaly) <--

df_anomalies.select("block_height", "timestamp", "block_reward").show()
# --> Exibe os blocos com recompensas consideradas anômalas para investigação / Displays blocks with anomalous rewards for investigation <--

##### 🔶 ₿ -----> total_fees

In [None]:
df_blocks.select(
    F.min("total_fees").alias("min_fees"),
    F.max("total_fees").alias("max_fees"),
    F.avg("total_fees").alias("avg_fees"),
    F.stddev("total_fees").alias("stddev_fees")
).show(5)
# --> Calcula estatísticas descritivas sobre as taxas totais por bloco: mínimo, máximo, média e desvio padrão / Computes descriptive statistics for total fees per block: min, max, mean, and standard deviation <--

In [None]:
df_blocks.select(F.col("total_fees"), F.col("num_transactions")).stat.corr("total_fees", "num_transactions")
# --> Calcula a correlação de Pearson entre as taxas totais e o número de transações por bloco / Computes the Pearson correlation between total fees and number of transactions per block <--

In [None]:
stats = df_blocks.select(
    F.avg("total_fees").alias("mean"),
    F.stddev("total_fees").alias("stddev")
).collect()[0]
# --> Calcula a média e o desvio padrão das taxas totais por bloco / Computes the mean and standard deviation of total fees per block <--

mean_fees = stats["mean"]
stddev_fees = stats["stddev"]
# --> Armazena os valores da média e do desvio padrão para uso posterior / Stores the mean and standard deviation values for later use <--

df_anomalies = df_blocks.filter(F.col("total_fees") > mean_fees + 3 * stddev_fees)
# --> Filtra os blocos com taxas totais significativamente altas (acima de 3 desvios da média) / Filters blocks with significantly high total fees (above 3 standard deviations from the mean) <--

df_anomalies.select("block_height", "timestamp", "total_fees", "num_transactions").show()
# --> Exibe os blocos anômalos com suas respectivas taxas, timestamps e número de transações / Displays the anomalous blocks with their fees, timestamps, and transaction counts <--

In [None]:
df_fees_time = df_blocks.groupBy(F.date_format("timestamp", "yyyy-MM-dd").alias("date")) \
    .agg(F.avg("total_fees").alias("avg_total_fees")) \
    .orderBy("date")
# --> Agrupa os blocos por data e calcula a média diária das taxas totais / Groups blocks by date and calculates the daily average of total fees <--

df_fees_time.show(5)
# --> Exibe as 5 primeiras datas com suas respectivas médias de taxas totais / Displays the first 5 dates with their average total fees <--

##### 🔶 ₿ -----> block_value_btc

In [None]:
df_blocks = df_blocks.withColumn("calculated_block_value", F.col("block_reward") + F.col("total_fees"))
# --> Cria uma nova coluna com o valor total do bloco calculado (recompensa + taxas) / Creates a new column with the calculated total block value (reward + fees) <--

df_mismatch = df_blocks.filter(F.col("block_value_btc") != F.col("calculated_block_value"))
# --> Filtra os blocos onde o valor total registrado é diferente do valor calculado / Filters blocks where the recorded block value differs from the calculated value <--

df_mismatch.select("block_height", "timestamp", "block_reward", "total_fees", "block_value_btc", "calculated_block_value").show()
# --> Exibe os blocos com discrepâncias entre valor registrado e valor calculado / Displays blocks with discrepancies between recorded and calculated block values <--

In [None]:
df_blocks.select(
    F.min("block_value_btc").alias("min_block_value"),
    F.max("block_value_btc").alias("max_block_value"),
    F.avg("block_value_btc").alias("avg_block_value"),
    F.stddev("block_value_btc").alias("stddev_block_value")
).show()
# --> Calcula estatísticas descritivas do valor total de BTC por bloco: mínimo, máximo, média e desvio padrão / Computes descriptive statistics for total BTC value per block: min, max, mean, and standard deviation <--

In [None]:
plot_btc_total_hourly(df_blocks, image_path)

In [None]:
df_block_time = df_blocks.groupBy(F.date_format("timestamp", "yyyy-MM-dd").alias("date")) \
    .agg(F.avg("block_value_btc").alias("avg_block_value")) \
    .orderBy("date")
# --> Agrupa os blocos por data e calcula a média diária do valor total dos blocos (recompensa + taxas) / Groups blocks by date and calculates the daily average block value (reward + fees) <--

df_block_time.show(10)
# --> Exibe as 10 primeiras datas com suas respectivas médias de valor de bloco em BTC / Displays the first 10 dates with their respective average block value in BTC <--

In [None]:
stats = df_blocks.select(
    F.avg("block_value_btc").alias("mean"),
    F.stddev("block_value_btc").alias("stddev")
).collect()[0]
# --> Calcula a média e o desvio padrão do valor total dos blocos (em BTC) / Computes the mean and standard deviation of total block value (in BTC) <--

mean_block_value = stats["mean"]
stddev_block_value = stats["stddev"]
# --> Armazena os valores da média e do desvio padrão para uso posterior / Stores mean and standard deviation values for later use <--

df_anomalies = df_blocks.filter(F.col("block_value_btc") > mean_block_value + 3 * stddev_block_value)
# --> Filtra os blocos com valores totais anômalos (acima de 3 desvios padrão da média) / Filters blocks with anomalously high total values (above 3 standard deviations from the mean) <--

##### 🔶 ₿ -----> Correlação

In [None]:
correlation_reward_height = df_blocks.select(F.col("block_reward"), F.col("block_height")).stat.corr("block_reward", "block_height")
# --> Calcula a correlação de Pearson entre a recompensa do bloco e sua altura / Computes Pearson correlation between block reward and block height <--

print(f"Correlação entre block_reward e block_height: {correlation_reward_height}")
# --> Exibe o valor da correlação entre recompensa e altura do bloco / Prints the correlation between reward and block height <--

correlation_fees_transactions = df_blocks.select(F.col("total_fees"), F.col("num_transactions")).stat.corr("total_fees", "num_transactions")
# --> Calcula a correlação entre as taxas totais e o número de transações por bloco / Computes the correlation between total fees and number of transactions per block <--

print(f"Correlação entre total_fees e num_transactions: {correlation_fees_transactions}")
# --> Exibe o valor da correlação entre taxas e número de transações / Prints the correlation between total fees and number of transactions <--

correlation_value_moved = df_blocks.select(F.col("block_value_btc"), F.col("total_btc_moved")).stat.corr("block_value_btc", "total_btc_moved")
# --> Calcula a correlação entre o valor total do bloco e o total de BTC movimentado / Computes the correlation between total block value and total BTC moved <--

print(f"Correlação entre block_value_btc e total_btc_moved: {correlation_value_moved}")
# --> Exibe o valor da correlação entre valor do bloco e BTC movimentado / Prints the correlation between block value and BTC moved <--

df_correlation = spark.createDataFrame([
    ("block_reward x block_height", correlation_reward_height),
    ("total_fees x num_transactions", correlation_fees_transactions),
    ("block_value_btc x total_btc_moved", correlation_value_moved)
], ["Variáveis", "Correlação"])
# --> Cria um DataFrame com os pares de variáveis e seus respectivos coeficientes de correlação / Creates a DataFrame with variable pairs and their correlation coefficients <--

df_correlation.show()
# --> Exibe a tabela de correlação entre variáveis de interesse / Displays the correlation table between key variables <--

In [None]:
df_blocks.show(5)
df_blocks.printSchema()

<img src="https://upload.wikimedia.org/wikipedia/commons/4/46/Bitcoin.svg" alt="Logo do Bitcoin" width="100" style="margin-right: 30px;">
<font size="15">Transactions</font>

  root   
 |-- tx_hash: string  ------------------------> Representa o identificador único da transação, gerado a partir do hash criptográfico da transação atual, garantindo a integridade e segurança da blockchain.  
 |-- block_height: long  ---------------------> Representa a altura do bloco na blockchain, ou seja, o número sequencial de cada bloco na cadeia.  
 |-- timestamp: timestamp_ntz  ------------> A coluna timestamp registra o momento exato em que um bloco foi minerado e adicionado à blockchain.  
 |-- address: string  ------------------------> O endereço da carteira envolvida na transação, identificando remetentes e destinatários.  
 |-- direction: string  -----------------------> Indica se a transação é de entrada ("in") ou saída ("out") para o endereço da carteira.  
 |-- input_count: long  ---------------------> Quantidade de inputs na transação, ou seja, quantas entradas foram usadas para compor a transação.  
 |-- output_count: long  --------------------> Quantidade de outputs na transação, ou seja, para quantos endereços os valores foram enviados.  
 |-- fee: double  ---------------------------> Representa a taxa de transação paga aos mineradores pela inclusão da transação no bloco.    
 |-- transaction_size: long  ----------------> O tamanho da transação em bytes, determinando o espaço que ela ocupa no bloco.  


In [None]:
PASTA_TRANSACTIONS = "/Users/rodrigocampos/Library/Mobile Documents/com~apple~CloudDocs/requests/blockchain_transactions_part"

df_transactions = spark.read.parquet(PASTA_TRANSACTIONS)

In [None]:
df_transactions.show(5)
df_transactions.printSchema()

##### 🔶 ₿ -----> timestamp

In [None]:
df_transactions.select(
    F.min("timestamp").alias("Min Timestamp"),
    F.max("timestamp").alias("Max Timestamp")
).show(truncate=False)
# --> Exibe o menor e o maior timestamp presentes na tabela de transações para auditoria do intervalo temporal / Displays the minimum and maximum timestamps in the transactions table for auditing the time range <--

In [None]:
df_transactions = df_transactions.orderBy("timestamp")
# --> Ordena o DataFrame de transações em ordem crescente de tempo / Sorts the transactions DataFrame in ascending order of time <--

In [None]:
windowSpec = Window.orderBy("timestamp")
# --> Define uma janela ordenada por timestamp para acessar registros anteriores / Defines a window ordered by timestamp to access previous rows <--

df_transactions = df_transactions.withColumn("prev_timestamp", F.lag("timestamp").over(windowSpec))
# --> Cria nova coluna com o timestamp da transação anterior usando a função lag() / Adds a new column with the previous transaction's timestamp using the lag() function <--

In [None]:
df_transactions = df_transactions.withColumn("time_diff", 
                                 F.unix_timestamp("timestamp") - F.unix_timestamp("prev_timestamp"))
# --> Calcula a diferença de tempo entre transações consecutivas em segundos / Computes the time difference between consecutive transactions in seconds <--

##### 🔶 ₿ -----> fee

In [None]:
df_transactions.select(
    F.min("fee").alias("Min Fee"),
    F.max("fee").alias("Max Fee")
).show()
# --> Exibe a menor e a maior taxa de transação no DataFrame para análise de extremos / Displays the minimum and maximum transaction fees in the DataFrame for outlier analysis <--

In [None]:
df_fee_time = df_transactions.select(
    F.date_format("timestamp", "yyyy-MM-dd").alias("date"),
    F.col("fee")
).groupBy("date").agg(
    F.avg("fee").alias("avg_fee"),
    F.percentile_approx("fee", 0.25).alias("q1_fee"),
    F.percentile_approx("fee", 0.75).alias("q3_fee"),
    F.max("fee").alias("max_fee"),
    F.min("fee").alias("min_fee")
).orderBy("date")
# --> Agrupa as transações por data e calcula estatísticas diárias sobre as taxas: média, quartis, máximo e mínimo / Groups transactions by date and computes daily statistics on fees: mean, quartiles, max, and min <--

df_fee_time.show(5, truncate=False)
# --> Exibe as 5 primeiras datas com estatísticas de taxas de transação / Displays the first 5 dates with transaction fee statistics <--

In [None]:
df_transactions.filter(F.col("fee") < 0).select("total_input", "total_output", "fee").show(20, False)
# --> Filtra e exibe transações com taxa negativa, indicando possíveis inconsistências nos dados / Filters and displays transactions with negative fees, indicating possible data inconsistencies <--

In [None]:
df_fee_size_corr = df_transactions.stat.corr("fee", "transaction_size")
# --> Calcula a correlação de Pearson entre a taxa de transação e o tamanho da transação / Computes the Pearson correlation between transaction fee and transaction size <--

print(f"Correlação entre fee e transaction_size: {df_fee_size_corr}")
# --> Exibe o valor da correlação entre taxa e tamanho da transação / Prints the correlation value between fee and transaction size <--

In [None]:
df_inputs_outputs = df_transactions.select("num_inputs", "num_outputs")\
    .describe(["num_inputs", "num_outputs"])
# --> Gera estatísticas descritivas (count, mean, stddev, min, max) para o número de entradas e saídas por transação / Generates descriptive statistics (count, mean, stddev, min, max) for the number of inputs and outputs per transaction <--

df_inputs_outputs.show()
# --> Exibe as estatísticas descritivas calculadas / Displays the computed descriptive statistics <--

In [None]:
df_input_output = df_transactions.select(
    F.date_format("timestamp", "yyyy-MM-dd").alias("date"),
    "total_input",
    "total_output"
).groupBy("date").agg(
    F.avg("total_input").alias("avg_input"),
    F.avg("total_output").alias("avg_output"),
    F.sum("total_input").alias("sum_input"),
    F.sum("total_output").alias("sum_output")
).orderBy("date")
# --> Agrupa as transações por data e calcula estatísticas diárias sobre valores de entrada e saída: média e soma / Groups transactions by date and computes daily statistics for input and output values: average and sum <--

df_input_output.show(10, truncate=False)
# --> Exibe as 10 primeiras datas com médias e somas de valores de entrada e saída / Displays the first 10 dates with average and total values of inputs and outputs <--


In [None]:
df_transactions_daily = df_transactions.select(
    F.date_format("timestamp", "yyyy-MM-dd").alias("date")
).groupBy("date").agg(
    F.count("*").alias("num_transactions")
).orderBy("date")
# --> Agrupa as transações por data e conta quantas ocorreram por dia / Groups transactions by date and counts how many occurred per day <--

df_transactions_daily.show(10, truncate=False)
# --> Exibe as 10 primeiras datas com o número total de transações registradas / Displays the first 10 dates with the total number of transactions recorded <--


In [None]:
df_transaction_size_time = df_transactions.select(
    F.date_format("timestamp", "yyyy-MM-dd").alias("date"),
    "transaction_size"
).groupBy("date").agg(
    F.avg("transaction_size").alias("avg_transaction_size")
).orderBy("date")
# --> Agrupa as transações por data e calcula o tamanho médio das transações por dia / Groups transactions by date and calculates the average transaction size per day <--

df_transaction_size_time.show(10, truncate=False)
# --> Exibe as 10 primeiras datas com o tamanho médio diário das transações / Displays the first 10 dates with the average daily transaction size <--

In [None]:
df_transactions.show(5)
df_transactions.printSchema()

In [None]:
plot_transaction_fee_time_series(df_transactions, image_path)

<img src="https://upload.wikimedia.org/wikipedia/commons/4/46/Bitcoin.svg" alt="Logo do Bitcoin" width="100" style="margin-right: 30px;">
<font size="15">Addresses</font>

root   
 |-- tx_hash: string   
 |-- block_height: long  ----------------> Representa a altura do bloco na blockchain, ou seja, o número sequencial de cada bloco na cadeia.   
 |-- timestamp: timestamp_ntz  ---------->  
 |-- address: string  ------------------->  
 |-- direction: string  ----------------->  
 |-- amount: double --------------------->   
 |-- balance_before: double  ------------>  
 |-- balance_after: double  ------------->  
 |-- wallet_type: string  --------------->  
 |-- is_zero: boolean  ------------------>  

In [None]:
PASTA_ADDRESSES = "/Users/rodrigocampos/Library/Mobile Documents/com~apple~CloudDocs/requests/blockchain_addresses_part"

df_addresses = spark.read.parquet(PASTA_ADDRESSES)

In [None]:
df_addresses.show(5)
df_addresses.printSchema()

In [None]:
# df_amount_wallet = df_addresses.groupBy("wallet_type") \
#     .agg(
#         F.count("amount").alias("num_transacoes"),
#         F.avg("amount").alias("avg_amount"),
#         F.percentile_approx("amount", 0.25).alias("q1_amount"),
#         F.percentile_approx("amount", 0.75).alias("q3_amount"),
#         F.max("amount").alias("max_amount")
#     ).orderBy(F.desc("num_transacoes"))
# # --> Agrupa os endereços por tipo de carteira e calcula estatísticas do valor transferido: contagem, média, quartis e valor máximo / Groups addresses by wallet type and computes statistics on transferred amount: count, mean, quartiles, and max value <--

# df_amount_wallet.show(truncate=False)
# # --> Exibe as estatísticas por tipo de carteira sem truncar os valores / Displays wallet type statistics without truncating values <--

In [None]:
# df_balance_analysis = df_addresses.groupBy("address") \
#     .agg(
#         F.avg("balance_before").alias("avg_balance_before"),
#         F.avg("balance_after").alias("avg_balance_after"),
#         F.sum("amount").alias("total_movimentado")
#     ).orderBy(F.desc("total_movimentado"))
# # --> Agrupa por endereço e calcula o saldo médio antes/depois e o total movimentado por endereço / Groups by address and computes average balance before/after and total moved amount per address <--

# df_balance_analysis.show(truncate=False)
# # --> Exibe os endereços com maiores movimentações, ordenados pelo valor total transferido / Displays addresses with highest transfers, ordered by total moved value <--

In [None]:
df_flow_time = df_addresses.groupBy(F.date_format("timestamp", "yyyy-MM-dd").alias("date")) \
    .agg(
        F.sum(F.when(F.col("direction") == "input", F.col("amount"))).alias("total_input"),
        F.sum(F.when(F.col("direction") == "output", F.col("amount"))).alias("total_output")
    ).orderBy("date")
# --> Agrupa os dados por data e calcula os fluxos totais de entrada e saída de BTC / Groups data by date and calculates total BTC input and output flows <--

df_flow_time.show(10, truncate=False)
# --> Exibe as 10 primeiras datas com os respectivos totais de entrada e saída de BTC / Displays the first 10 dates with total BTC input and output values <--

In [None]:
df_wallet_trend = df_addresses.groupBy("wallet_type") \
    .agg(F.count("*").alias("num_transacoes")) \
    .orderBy(F.desc("num_transacoes"))
# --> Agrupa os registros por tipo de carteira e conta o número total de transações associadas a cada tipo / Groups records by wallet type and counts the total number of transactions per type <--

df_wallet_trend.show(truncate=False)
# --> Exibe a quantidade de transações por tipo de carteira, ordenadas da maior para a menor / Displays the number of transactions per wallet type, ordered from highest to lowest <--

In [None]:
df_zero_balance = df_addresses.filter(F.col("is_zero") == True) \
    .groupBy("wallet_type") \
    .agg(F.count("*").alias("num_enderecos_zerados")) \
    .orderBy(F.desc("num_enderecos_zerados"))
# --> Filtra os endereços com saldo zerado, agrupa por tipo de carteira e conta quantos existem em cada tipo / Filters addresses with zero balance, groups by wallet type, and counts how many exist per type <--

df_zero_balance.show(truncate=False)
# --> Exibe o número de endereços zerados por tipo de carteira, ordenados em ordem decrescente / Displays the number of zero-balance addresses by wallet type, ordered descending <--

In [None]:
df_exchanges_zerados = df_addresses.filter((F.col("is_zero") == True) & (F.col("wallet_type") == "exchange"))
# --> Filtra os endereços com saldo zerado e tipo de carteira 'exchange' / Filters zero balance addresses with wallet type 'exchange' <--

df_exchanges_zerados.show(10, truncate=False)
# --> Exibe os 10 primeiros endereços zerados do tipo 'exchange' / Displays the first 10 zero-balance addresses with wallet type 'exchange' <--

In [None]:
df_addresses_sorted = df_addresses.orderBy("address", "timestamp")
# --> Ordena os endereços por 'address' e 'timestamp' em ordem crescente / Sorts the addresses by 'address' and 'timestamp' in ascending order <--

In [None]:
df_addresses.select(
    F.min("balance_after").alias("Min balance_after"),
    F.max("balance_after").alias("Max balance_after")
).show()
# --> Calcula o menor e o maior saldo registrado após as transações / Computes the minimum and maximum balance after transactions <--

In [None]:
df_negative_balances = df_addresses.filter(F.col("balance_after") < 0)
# --> Filtra os endereços com saldo negativo após a transação / Filters addresses with negative balance after the transaction <--

df_negative_balances.show(10, truncate=False)
# --> Exibe os 10 primeiros endereços com saldo negativo / Displays the first 10 addresses with negative balance <--

In [None]:
df_negative_balances = df_addresses.filter(F.col("balance_before") < 0)
# --> Filtra os endereços com saldo negativo antes da transação / Filters addresses with negative balance before the transaction <--

df_negative_balances.show(10, truncate=False)
# --> Exibe os 10 primeiros endereços com saldo negativo antes da transação / Displays the first 10 addresses with negative balance before the transaction <--

In [None]:
df_addresses.groupBy("address").count().orderBy(F.desc("count")).show(10, truncate=False)
# --> Agrupa os endereços e conta o número de transações por endereço, ordenando pela quantidade de transações / Groups addresses and counts the number of transactions per address, ordering by transaction count <--

In [None]:
df_addresses.filter(F.col("balance_after") < 0).show(10, truncate=False)
# --> Filtra os endereços com saldo negativo após a transação / Filters addresses with negative balance after the transaction <--

In [None]:
df_addresses.filter(F.col("balance_after") > 0).show(10, truncate=False)
# --> Filtra os endereços com saldo positivo após a transação / Filters addresses with positive balance after the transaction <--

In [None]:
# window_spec = Window.partitionBy("address").orderBy("timestamp")
# --> Define a janela de cálculo para cada endereço, ordenando por timestamp / Defines the window for calculation for each address, ordered by timestamp <--

# df_corrected_balance = df_addresses.withColumn(
#     "balance_acumulado", 
#     F.sum("amount").over(window_spec)
# )
# --> Cria uma nova coluna 'balance_acumulado' com o saldo acumulado por endereço e ordenado por timestamp / Creates a new column 'balance_acumulado' with the cumulative balance per address, ordered by timestamp <--

# df_corrected_balance.show(10, truncate=False)
# --> Exibe os 10 primeiros registros com o saldo acumulado corrigido / Displays the first 10 records with the corrected cumulative balance <--

In [None]:
df_addresses.filter((F.col("balance_after") < 0) & (F.col("direction") == "output")).count()
# --> Conta os endereços com saldo negativo após a transação e direção de saída / Counts addresses with negative balance after the transaction and output direction <--

In [None]:
df_addresses.filter((F.col("address") == "xxx")).show()
# --> Filtra os endereços com o valor específico "xxx" e exibe as transações associadas / Filters addresses with the specific value "xxx" and displays the associated transactions <--

##### 🔶 ₿ -----> timestamp

In [None]:

df_addresses.select(
    F.min("timestamp").alias("Min Timestamp"),
    F.max("timestamp").alias("Max Timestamp")
).show(truncate=False)

In [None]:
df_addresses.select(
    F.min("timestamp").alias("Min Timestamp"),
    F.max("timestamp").alias("Max Timestamp")
).show(truncate=False)
# --> Exibe o menor e o maior timestamp presentes na tabela de endereços / Displays the minimum and maximum timestamps in the addresses table <--

In [None]:
windowSpec = Window.orderBy("timestamp")
# --> Define uma janela ordenada por timestamp para acessar o valor anterior / Defines a window ordered by timestamp to access the previous value <--

df_addresses = df_addresses.withColumn("prev_timestamp", F.lag("timestamp").over(windowSpec))
# --> Cria uma nova coluna com o timestamp anterior para cada endereço usando a função lag() / Creates a new column with the previous timestamp for each address using the lag() function <--

In [None]:
df_addresses = df_addresses.withColumn("time_diff", 
                                 F.unix_timestamp("timestamp") - F.unix_timestamp("prev_timestamp"))
# --> Cria uma nova coluna 'time_diff' com a diferença de tempo entre o timestamp atual e o anterior, em segundos / Creates a new column 'time_diff' with the time difference between current and previous timestamp, in seconds <--

In [None]:
primeiro_bloco = df_blocks.select("block_height").orderBy("block_height").first()[0]
# --> Seleciona o primeiro bloco (menor valor de block_height) / Selects the first block (lowest block_height value) <--

ultimo_bloco = df_blocks.select("block_height").orderBy(F.desc("block_height")).first()[0]
# --> Seleciona o último bloco (maior valor de block_height) / Selects the last block (highest block_height value) <--

print(f"Primeiro bloco salvo: {primeiro_bloco}")
# --> Exibe o primeiro bloco salvo / Displays the first saved block <--

print(f"Último bloco salvo: {ultimo_bloco}")
# --> Exibe o último bloco salvo / Displays the last saved block <--

In [None]:
df_blocks.printSchema()

In [None]:
import os
from pyspark.sql import functions as F
from pyspark import StorageLevel

# ==========================================================
# Caminhos dos dados / Data paths
# ==========================================================
SAVE_PATH_BLOCKS = "/Users/rodrigocampos/.../processed_data/blockchain_blocks_part"
SAVE_PATH_TRANSACTIONS = "/Users/rodrigocampos/.../processed_data/blockchain_transactions_part"
SAVE_PATH_ADDRESSES = "/Users/rodrigocampos/.../processed_data/blockchain_addresses_part"

FEATURE_PATH_BLOCKS = "/Users/rodrigocampos/.../bitcoin_features/blockchain_blocks_part"
FEATURE_PATH_TRANSACTIONS = "/Users/rodrigocampos/.../bitcoin_features/blockchain_transactions_part"
FEATURE_PATH_ADDRESSES = "/Users/rodrigocampos/.../bitcoin_features/blockchain_addresses_part"

METADATA_PATH = "/Users/rodrigocampos/.../processed_data/metadata.txt"

# ==========================================================
# Função para obter o intervalo salvo (com fallback no Parquet)
# / Retrieves the last saved block range from metadata.txt
# ==========================================================
def get_saved_block_range():
    if os.path.exists(METADATA_PATH):
        with open(METADATA_PATH, "r") as f:
            lines = [line.strip() for line in f.readlines() if line.strip()]
            first_block = int(lines[0]) if len(lines) > 0 else 0
            last_block = int(lines[1]) if len(lines) > 1 else 0
            return first_block, last_block
        # --> Reads and parses the block range from metadata.txt / Lê e interpreta o intervalo de blocos do metadata <--
    else:
        return 0, 0
        # --> Default when metadata does not exist / Valor padrão se metadata não for encontrado <--

# ==========================================================
# Atualiza o metadata.txt com os novos blocos salvos
# / Updates the metadata file with the new block range
# ==========================================================
def update_metadata(first_block, last_block):
    with open(METADATA_PATH, "w") as f:
        f.write(f"{first_block}\n{last_block}")
    # --> Writes updated block range to metadata file / Escreve novo intervalo de blocos no metadata <--

# ==========================================================
# Garante o nome correto da coluna de timestamp
# / Ensures the timestamp column is standardized
# ==========================================================
if "block_timestamp" not in df_blocks.columns:
    df_blocks = df_blocks.withColumnRenamed("timestamp", "block_timestamp")
    # --> Renames 'timestamp' to 'block_timestamp' if needed / Renomeia se necessário <--

# ==========================================================
# Lê intervalo salvo e extrai intervalo novo dos dados
# / Reads saved block range and extracts new range from DataFrame
# ==========================================================
first_saved_block, last_saved_block = get_saved_block_range()
# --> Retrieves the previously saved block interval / Recupera o intervalo de blocos salvo anteriormente <--

agg_values = df_blocks.select("block_height").agg(
    F.min("block_height").alias("min_block"),
    F.max("block_height").alias("max_block")
).first()
# --> Extracts min and max block from current data / Extrai o menor e maior bloco dos dados atuais <--

new_first_block = agg_values["min_block"] if agg_values["min_block"] is not None else 0
new_last_block = agg_values["max_block"] if agg_values["max_block"] is not None else 0
# --> Handles None values for safe initialization / Garante valores válidos mesmo que None <--

# ==========================================================
# Proteção extra caso os blocos estejam invertidos
# / Extra safety to handle inverted metadata ranges
# ==========================================================
min_ref = min(first_saved_block, last_saved_block)
max_ref = max(first_saved_block, last_saved_block)
# --> Ensures consistent min/max even if metadata is malformed / Garante min e max válidos mesmo que invertidos <--

# ==========================================================
# Filtra blocos que estão fora do intervalo já salvo
# / Filters out blocks already saved, keeping only new ones
# ==========================================================
df_blocks_to_save = df_blocks.filter((F.col("block_height") < min_ref) | (F.col("block_height") > max_ref))
df_transactions_to_save = df_transactions.filter((F.col("block_height") < min_ref) | (F.col("block_height") > max_ref))
df_addresses_to_save = df_addresses.filter((F.col("block_height") < min_ref) | (F.col("block_height") > max_ref))
# --> Keeps only blocks outside the saved range / Mantém apenas blocos ainda não salvos <--

# ==========================================================
# Executa salvamento de forma segura e eficiente
# / Safely saves partitioned data and updates metadata
# ==========================================================
try:
    if not df_blocks_to_save.rdd.isEmpty():
        print("[INFO] Iniciando persistência dos dados...")

        df_blocks_to_save = df_blocks_to_save.withColumn("block_timestamp", F.col("block_timestamp").cast("timestamp")) \
                                             .withColumn("year", F.year("block_timestamp")) \
                                             .withColumn("month", F.month("block_timestamp")) \
                                             .repartition("year", "month") \
                                             .persist(StorageLevel.MEMORY_AND_DISK)
        # --> Prepares blocks DataFrame for partitioned saving / Prepara o DataFrame de blocos para particionamento <--

        df_transactions_to_save = df_transactions_to_save.withColumn("timestamp", F.col("timestamp").cast("timestamp")) \
                                                         .withColumn("year", F.year("timestamp")) \
                                                         .withColumn("month", F.month("timestamp")) \
                                                         .repartition("year", "month") \
                                                         .persist(StorageLevel.MEMORY_AND_DISK)
        # --> Prepares transactions DataFrame / Prepara o DataFrame de transações <--

        df_addresses_to_save = df_addresses_to_save.withColumn("timestamp", F.col("timestamp").cast("timestamp")) \
                                                   .withColumn("year", F.year("timestamp")) \
                                                   .withColumn("month", F.month("timestamp")) \
                                                   .repartition("year", "month") \
                                                   .persist(StorageLevel.MEMORY_AND_DISK)
        # --> Prepares addresses DataFrame / Prepara o DataFrame de endereços <--

        print("[INFO] Salvando blocos...")
        df_blocks_to_save.write.mode("append").partitionBy("year", "month").parquet(SAVE_PATH_BLOCKS)
        df_blocks_to_save.write.mode("overwrite").partitionBy("year", "month").parquet(FEATURE_PATH_BLOCKS)
        # --> Saves blocks both incrementally and for features / Salva blocos incrementalmente e para features <--

        print("[INFO] Salvando transações...")
        df_transactions_to_save.write.mode("append").partitionBy("year", "month").parquet(SAVE_PATH_TRANSACTIONS)
        df_transactions_to_save.write.mode("overwrite").partitionBy("year", "month").parquet(FEATURE_PATH_TRANSACTIONS)

        print("[INFO] Salvando endereços...")
        df_addresses_to_save.write.mode("append").partitionBy("year", "month").parquet(SAVE_PATH_ADDRESSES)
        df_addresses_to_save.write.mode("overwrite").partitionBy("year", "month").parquet(FEATURE_PATH_ADDRESSES)

        # ==========================================================
        # Atualiza intervalo salvo com base nos blocos efetivamente salvos
        # / Updates the metadata range with actual saved data
        # ==========================================================
        saved_min = df_blocks_to_save.select(F.min("block_height")).first()[0]
        saved_max = df_blocks_to_save.select(F.max("block_height")).first()[0]

        updated_first = min(min_ref, saved_min) if min_ref != 0 else saved_min
        updated_last = max(max_ref, saved_max) if max_ref != 0 else saved_max
        # --> Ensures metadata covers all saved data / Garante que o metadata cubra todos os blocos salvos <--

        update_metadata(updated_first, updated_last)
        # --> Persists updated range to metadata.txt / Atualiza o metadata com novo intervalo <--

        total_saved = df_blocks_to_save.select("block_height").distinct().count()
        print(f"[SUCESSO] Blocos fora do intervalo salvos: {total_saved}")
        print(f"[SUCESSO] Novo intervalo salvo: {updated_first} - {updated_last}")

        df_blocks_to_save.unpersist()
        df_transactions_to_save.unpersist()
        df_addresses_to_save.unpersist()
        # --> Releases memory from persisted DataFrames / Libera memória dos DataFrames persistidos <--

    else:
        print("[INFO] Nenhum bloco novo ou retroativo para salvar.")
        # --> No new data found to persist / Nenhum dado novo foi identificado para salvar <--

except Exception as e:
    print(f"[ERRO] Falha ao salvar dados: {e}")
    # --> Logs error if write fails / Exibe erro caso falhe ao salvar <--


In [None]:
# from pyspark.sql import SparkSession
# import os
# import shutil

# PASTA_BLOCKS = "/Users/rodrigocampos/Library/Mobile Documents/com~apple~CloudDocs/requests/blockchain_blocks_part"
# PASTA_TRANSACTIONS = "/Users/rodrigocampos/Library/Mobile Documents/com~apple~CloudDocs/requests/blockchain_transactions_part"
# PASTA_ADDRESSES = "/Users/rodrigocampos/Library/Mobile Documents/com~apple~CloudDocs/requests/blockchain_addresses_part"

# SAVE_PATH_BLOCKS = "/Users/rodrigocampos/Library/Mobile Documents/com~apple~CloudDocs/processed_data/blockchain_blocks_part"
# SAVE_PATH_TRANSACTIONS = "/Users/rodrigocampos/Library/Mobile Documents/com~apple~CloudDocs/processed_data/blockchain_transactions_part"
# SAVE_PATH_ADDRESSES = "/Users/rodrigocampos/Library/Mobile Documents/com~apple~CloudDocs/processed_data/blockchain_addresses_part"


# def verificar_e_excluir(origem, destino):
#     """
#     Verifica se os arquivos em 'processed_data' são iguais aos de 'requests'.
#     Se forem iguais em range de blocos, quantidade de linhas e identidade dos blocos, exclui os arquivos de 'requests'.
#     / Checks if files in 'processed_data' match those in 'requests'. 
#     If they match in block range, number of rows, and block identity, delete the files from 'requests'.
#     """
#     # Lendo os dados das duas pastas
#     df_orig = spark.read.parquet(origem)
#     df_proc = spark.read.parquet(destino)
  
 
#     # num_linhas_orig = df_orig.count()
#     # num_linhas_proc = df_proc.count()

#     # if num_linhas_orig != num_linhas_proc:
#     # --> Checks if the number of rows in both datasets are different / Verifica se o número de linhas nos dois datasets é diferente <--

#     min_block_orig, max_block_orig = df_orig.agg(F.min("block_height"), F.max("block_height")).collect()[0]
#     min_block_proc, max_block_proc = df_proc.agg(F.min("block_height"), F.max("block_height")).collect()[0]
#     # --> Obtains the first and last block saved in each dataset / Obtém o primeiro e último bloco salvo em cada dataset <--

#     if min_block_orig != min_block_proc or max_block_orig > max_block_proc:
#         print(f" Erro: Faixa de blocos diferente! Origem ({min_block_orig}--{max_block_orig}) vs Processado({min_block_proc}--{max_block_proc})")
#         # --> Checks if the block range in both datasets is the same / Verifica se o intervalo de blocos nos dois datasets é o mesmo <--
#         return False

#     # if "block_height" in df_orig.columns and "block_height" in df_proc.columns:
#     #     hash_diff = df_orig.select("block_height").subtract(df_proc.select("block_height")).count()
#     #     if hash_diff > 0:
#     #         print(f" Erro: Diferença nos hashes dos blocos! {hash_diff} blocos não coincidem")
#     #         return False

   
#     print(f" Dados correspondem! Excluindo arquivos de {origem}...")
#     shutil.rmtree(origem)
#     os.makedirs(origem)  
#     # --> Deletes files from the 'requests' folder if data matches / Exclui arquivos da pasta 'requests' se os dados coincidirem <--
#     return True



# print("\nVerificando e removendo arquivos de requests...")

# verificar_e_excluir(PASTA_BLOCKS, SAVE_PATH_BLOCKS)
# verificar_e_excluir(PASTA_TRANSACTIONS, SAVE_PATH_TRANSACTIONS)
# verificar_e_excluir(PASTA_ADDRESSES, SAVE_PATH_ADDRESSES)

# print("\nProcesso concluído!")
# # --> Executes the check and delete process for each data folder / Executa o processo de verificação e exclusão para cada pasta de dados <--

<img src="https://upload.wikimedia.org/wikipedia/commons/4/46/Bitcoin.svg" alt="Logo do Bitcoin" width="100" style="margin-right: 30px;">
<font size="15">Relação entre Tabelas</font>

In [None]:
# --> Joins addresses with transactions to map wallet behavior / Junta endereços com transações para mapear o comportamento das carteiras <--

# df_addresses_transactions = df_addresses.join(
#     df_transactions,
#     on="tx_hash",
#     how="inner"
# )
# --> Faz o join entre os DataFrames de endereços e transações usando a chave 'tx_hash' / Joins the addresses and transactions DataFrames on 'tx_hash' <--

# df_addresses_transactions.select(
#     "tx_hash", "address", "direction", "amount", "balance_before", "balance_after", "num_inputs", "num_outputs"
# ).orderBy(F.desc("amount")).show(10, truncate=False)
# --> Exibe os 10 primeiros registros de transações ordenados pela quantidade de 'amount', incluindo dados como direção da transação, saldo antes e depois, número de entradas e saídas / Displays the first 10 transaction records ordered by 'amount', including direction, balance before and after, and number of inputs and outputs <--

In [None]:


# df_blocks = df_blocks.withColumnRenamed("timestamp", "block_timestamp")
# --> Renomeia a coluna 'timestamp' para 'block_timestamp' para evitar ambiguidade / Renames the 'timestamp' column to 'block_timestamp' to avoid ambiguity <--

# df_addresses_blocks = df_addresses.join(
#     df_blocks,
#     on="block_height",
#     how="inner"
# )
# --> Faz o join entre os DataFrames de endereços e blocos usando 'block_height' como chave / Joins the addresses and blocks DataFrames on 'block_height' <--

# df_addresses_blocks.select(
#     "block_height", 
#     "block_timestamp",  # Nome modificado para evitar ambiguidade
#     "address", 
#     "amount", 
#     "wallet_type"
# ).orderBy(F.desc("amount")).show(10, truncate=False)
# --> Exibe os 10 primeiros registros de transações por bloco, ordenados por 'amount', incluindo informações sobre o endereço e tipo de carteira / Displays the first 10 transaction records by block, ordered by 'amount', including address and wallet type <--

In [None]:
# --> Groups addresses by wallet type to identify exchanges and institutional movement / Agrupa endereços por tipo de carteira para identificar exchanges e movimentação institucional <--

# df_wallet_type = df_addresses.groupBy("wallet_type").agg(
#     F.countDistinct("address").alias("num_addresses"),
#     F.sum("amount").alias("total_moved")
# ).orderBy(F.desc("total_moved"))
# --> Conta o número de endereços distintos e soma o total movimentado por cada tipo de carteira / Counts distinct addresses and sums the total moved for each wallet type <--

# df_wallet_type.show(10, truncate=False)
# --> Exibe os 10 tipos de carteiras com maior movimentação total / Displays the top 10 wallet types with the highest total movement <--

In [None]:
# --> Relates transactions to fees to detect network congestion / Relaciona transações a taxas para detectar congestionamento da rede <--

# df_transactions_fees = df_transactions.select(
#     "timestamp", "num_inputs", "num_outputs", "fee", "transaction_size"
# ).orderBy(F.desc("fee"))
# --> Seleciona as colunas de transações relevantes (timestamp, entradas, saídas, taxa e tamanho da transação) e ordena pela taxa / Selects relevant transaction columns (timestamp, inputs, outputs, fee, transaction size) and orders by fee <--

# df_transactions_fees.show(10, truncate=False)
# --> Exibe as 10 transações com maior taxa, para identificar possíveis congestionamentos / Displays the top 10 transactions with the highest fee to identify potential congestion <--

In [None]:
# --> Creates a cumulative balance table to track Bitcoin adoption / Cria uma tabela de saldos acumulados para rastrear a adoção do BTC <--

# window_spec = Window.partitionBy("address").orderBy("timestamp")
# --> Define uma janela para calcular o saldo acumulado por endereço, ordenando por timestamp / Defines a window to calculate cumulative balance by address, ordered by timestamp <--

# df_saldos = df_addresses.withColumn(
#     "saldo_acumulado", F.sum("amount").over(window_spec)
# )
# --> Cria a coluna 'saldo_acumulado' com a soma dos valores de 'amount' para cada endereço / Creates the 'saldo_acumulado' column with the sum of 'amount' values for each address <--

# df_saldos.show(10, truncate=False)
# --> Exibe os 10 primeiros endereços com saldo acumulado / Displays the first 10 addresses with cumulative balance <--

25/04/08 21:25:23 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE