In [1]:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, expr, collect_list, struct
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import numpy as np
import time
from IPython.display import clear_output
from pyspark.sql.functions import col, isnan, when, trim
from pyspark.sql import functions as F

import os
import sys
from pyspark.sql import SparkSession

In [2]:
def filter_last_n_blocks(df, atual_block_id, n):
    min_block_id = atual_block_id - n
    filtered_df = df.filter((df["Block_ID"] >= min_block_id) & (df["Block_ID"] < atual_block_id))
    return filtered_df

def get_dataframes(num_last_blocks = 50):
    block_df = None
    file_path = "../Scripts/output/block/block.csv"
    # Ler o arquivo CSV
    block_df = spark.read.csv(file_path, header=True, inferSchema=True)
    
    swap_df = None
    file_path = "../Scripts/output/swap/swap_transactions.csv"
    # Ler o arquivo CSV
    swap_df = spark.read.csv(file_path, header=True, inferSchema=True)
    
    transaction_df = None
    file_path = "../Scripts/output/transaction/transactions.csv"
    # Ler o arquivo CSV
    transaction_df = spark.read.csv(file_path, header=True, inferSchema=True)
    
    last_block = block_df.agg(F.max("Block_ID")).collect()[0][0]
    
    block_df = filter_last_n_blocks(block_df, last_block, num_last_blocks)
    transaction_df = filter_last_n_blocks(transaction_df, last_block, num_last_blocks)
    swap_df = filter_last_n_blocks(swap_df, last_block, num_last_blocks)
    
    return block_df, transaction_df, swap_df

In [3]:
def to_null(c):
    return when(~(col(c).isNull() | isnan(col(c)) | (trim(col(c)) == "")), col(c))

def clean_swap_dataframe(swap_df):
    swap_df_cleaned = swap_df.select([to_null(c).alias(c) for c in swap_df.columns]).na.drop()

    swap_df_cleaned = swap_df_cleaned.withColumn("From_Token_Price", when(col("From_Token_Price") == 0, float('nan')).otherwise(col("From_Token_Price")))
    swap_df_cleaned = swap_df_cleaned.withColumn("To_Token_Price", when(col("To_Token_Price") == 0, float('nan')).otherwise(col("To_Token_Price")))
    
    return swap_df_cleaned

In [4]:
def combine_dataframes(block_df, transaction_df, swap_df_cleaned):
    combined_altered_df = block_df.drop("Gas_Limit")
    combined_altered_df = combined_altered_df.drop("Gas_Used")
    combined_altered_df = combined_altered_df.drop("Timestamp_Block")
    df_combined = combined_altered_df.join(transaction_df, "Block_ID").join(swap_df_cleaned, "Hash_Transaction")
    
    return df_combined

In [5]:
output_folder = "../Scripts/output"

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
num_last_blocks = 5000

spark = SparkSession.builder \
    .appName("Análise de Transações Ethereum") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()


block_df, transaction_df, swap_df = get_dataframes(5000)

swap_df_cleaned = clean_swap_dataframe(swap_df)
df_combined = combine_dataframes(block_df, transaction_df, swap_df_cleaned)

In [6]:
df_combined_pandas = df_combined.toPandas()
df_combined_pandas.to_csv('combined_df.csv', index=False)