In [0]:
##
access_key = 'xxxxxxx'
secret_key = 'xxxxxxxxxxxxxx'
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)

# If you are using Auto Loader file notification mode to load files, provide the AWS Region ID.
aws_region = "us-east-2"
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, to_date

In [0]:
def read_file(path):
    df = spark.read.csv(path, inferSchema=True, header=True)
    return df

In [0]:
# Função para medir o tempo de execução
def measure_time(func, *args):
    import time
    start_time = time.time()
    result = func(*args)
    end_time = time.time()
    print(f"Tempo de execução para {func.__name__}: {end_time - start_time:.4f} segundos")
    return result

In [0]:
# Função para limpar os dados
def clean_dataframe(df):
    df = df.withColumn("amount", regexp_replace(col("amount"), "\\$", "").cast("double"))
    df = df.withColumn("date", to_date(col("date"), "yyyy-MM-dd HH:mm:ss"))
    df = df.withColumn("use_chip", regexp_replace(col("use_chip"), "Swipe Transaction", "False"))
    df = df.withColumn("use_chip", regexp_replace(col("use_chip"), "Online Transaction", "True"))
    return df

In [0]:
# Função para ordenar o DataFrame
def sort_dataframe(df):
    return df.orderBy(col("amount").desc())

In [0]:
# Função para agrupar o DataFrame
# Exemplo: Contar o número de transações por estado do comerciante
def group_dataframe(df):
    return df.groupBy("merchant_state").count()

In [0]:
# Função para filtrar o DataFrame
# Exemplo: Filtrar transações online com valor acima de 100
def filter_dataframe(df):
    return df.filter((col("use_chip") == "True") & (col("amount") > 100))

In [0]:
# Função para salvar o DataFrame
def save_file(df):
    return df.write.option("header", True).csv('output_path_large.csv')

In [0]:
# Aplicar as transformações com medição de tempo
df = measure_time(read_file, 's3a://financial-dataset-ada/transactions_data.csv')
df_cleaned = measure_time(clean_dataframe, df)
df_sorted = measure_time(sort_dataframe, df_cleaned)
df_grouped = measure_time(group_dataframe, df_cleaned)
df_filtered = measure_time(filter_dataframe, df_cleaned)
df_save = measure_time(save_file, df_cleaned)

Tempo de execução para read_file: 34.0868 segundos
Tempo de execução para clean_dataframe: 0.0975 segundos
Tempo de execução para sort_dataframe: 0.0128 segundos
Tempo de execução para group_dataframe: 0.0134 segundos
Tempo de execução para filter_dataframe: 0.0190 segundos
Tempo de execução para save_file: 108.4828 segundos


In [0]:
# COMMAND ----------

# Mostrar os resultados
df.show()
df_cleaned.show()
df_sorted.show()
df_grouped.show()
df_filtered.show()

+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+-------+----+------+
|     id|               date|client_id|card_id| amount|          use_chip|merchant_id|  merchant_city|merchant_state|    zip| mcc|errors|
+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+-------+----+------+
|7475327|2010-01-01 00:01:00|     1556|   2972|$-77.00| Swipe Transaction|      59935|         Beulah|            ND|58523.0|5499|  null|
|7475328|2010-01-01 00:02:00|      561|   4575| $14.57| Swipe Transaction|      67570|     Bettendorf|            IA|52722.0|5311|  null|
|7475329|2010-01-01 00:02:00|     1129|    102| $80.00| Swipe Transaction|      27092|          Vista|            CA|92084.0|4829|  null|
|7475331|2010-01-01 00:05:00|      430|   2860|$200.00| Swipe Transaction|      27092|    Crown Point|            IN|46307.0|4829|  null|
|7475332|2010-01-01 00:06:00|     