# Notebook: 04_transformar_dados.ipynb

Este notebook tem como objetivo:
1. Ler o arquivo `nafld1.csv` do bucket `raw` no MinIO.
2. Transformar o arquivo para o formato Parquet e salvá-lo no bucket `bronze`.
3. Realizar uma análise inicial exploratória (EDA) para entender melhor os dados.

In [2]:
from pyspark.sql import SparkSession

# Configuração do Spark para trabalhar com MinIO
spark = SparkSession.builder \
    .appName("Transformação de Dados") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Ler o arquivo CSV do bucket 'raw', ignorando o cabeçalho original
file_path = "s3a://raw/nafld1.csv"

try:
    df = spark.read.csv(
        file_path,
        header=False,         # Ignorar o cabeçalho
        inferSchema=True,     # Inferir os tipos de dados
        nullValue="NA"        # Tratar "NA" como valores nulos
    )
    print(f"Arquivo '{file_path}' carregado com sucesso!")
except Exception as e:
    print(f"Erro ao carregar o arquivo CSV: {e}")

# Exibir as colunas detectadas inicialmente
print("Colunas detectadas automaticamente:", df.columns)

# Renomear as colunas (ajustar manualmente caso tenha uma a mais)
colunas = ["_c0", "id", "age", "male", "weight", "height", "bmi", "case_id", "futime", "status"]

if len(df.columns) == len(colunas):  # Verifica se o número de colunas corresponde
    df = df.toDF(*colunas)  # Renomear com os nomes definidos
    # Remover a coluna desnecessária se existir
    if "_c0" in df.columns:
        df = df.drop("_c0")  # Remova _c0 caso seja apenas um índice ou algo irrelevante
else:
    print("Aviso: Número de colunas detectado não corresponde ao esperado.")

# Exibir as primeiras linhas do DataFrame
df.show(5, truncate=False)

# Exibir o esquema do DataFrame
df.printSchema()

# Contar o número de linhas e colunas
print(f"Número de linhas: {df.count()}, Número de colunas: {len(df.columns)}")

# Exibir estatísticas descritivas
df.describe().show()

your 131072x1 screen size is bogus. expect trouble
25/03/13 14:24:22 WARN Utils: Your hostname, DESKTOP-78HVMSD resolves to a loopback address: 127.0.1.1; using 172.26.49.215 instead (on interface eth0)
25/03/13 14:24:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/13 14:24:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/13 14:24:25 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


Arquivo 's3a://raw/nafld1.csv' carregado com sucesso!
Colunas detectadas automaticamente: ['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9']
+---+---+----+------+------+----------------+-------+------+------+
|id |age|male|weight|height|bmi             |case_id|futime|status|
+---+---+----+------+------+----------------+-------+------+------+
|id |age|male|weight|height|bmi             |case.id|futime|status|
|1  |57 |0   |60    |163   |22.6909394821587|10630  |6261  |0     |
|2  |67 |0   |70.4  |168   |24.8840277061027|14817  |624   |0     |
|3  |53 |1   |105.8 |186   |30.4535370503297|3      |1783  |0     |
|4  |56 |1   |109.3 |170   |37.8300998173156|6628   |3143  |0     |
+---+---+----+------+------+----------------+-------+------+------+
only showing top 5 rows

root
 |-- id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- male: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- height: string (nullable = true)
 |-- bmi: string

25/03/13 14:24:34 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 6:>                                                          (0 + 1) / 1]

+-------+-----------------+------------------+-------------------+------------------+------------------+------------------+-----------------+-----------------+-------------------+
|summary|               id|               age|               male|            weight|            height|               bmi|          case_id|           futime|             status|
+-------+-----------------+------------------+-------------------+------------------+------------------+------------------+-----------------+-----------------+-------------------+
|  count|            17550|             17550|              17550|             12764|             14382|             12589|            17519|            17550|              17550|
|   mean|8784.215966721751| 52.65963872585333|0.46732007521796115| 86.35334952597289|169.43494889089772|30.073864900657256|  8840.9244776801|2410.600547039717|0.07772522650863298|
| stddev|5070.970712376356|14.722515127574011| 0.4989450954701136|22.239458031175896|10.141421399552

                                                                                

In [3]:
# Caminho para salvar o arquivo Parquet no bucket 'bronze'
output_path = "s3a://bronze/nafld1.parquet"

try:
    # Salvar o DataFrame em formato Parquet
    df.write.mode("overwrite").parquet(output_path)
    print(f"Arquivo salvo com sucesso no bucket 'bronze' em {output_path}.")
except Exception as e:
    print(f"Erro ao salvar o arquivo Parquet no bucket 'bronze': {e}")

[Stage 9:>                                                          (0 + 1) / 1]

Arquivo salvo com sucesso no bucket 'bronze' em s3a://bronze/nafld1.parquet.


                                                                                

In [4]:
# Carregar o arquivo Parquet do bucket 'bronze'
try:
    parquet_df = spark.read.parquet("s3a://bronze/nafld1.parquet")
    print("Arquivo Parquet carregado com sucesso do bucket 'bronze'!")
    # Exibir as primeiras linhas do DataFrame Parquet
    parquet_df.show(5, truncate=False)
    # Exibir o esquema para confirmar os tipos de dados
    parquet_df.printSchema()
except Exception as e:
    print(f"Erro ao carregar o arquivo Parquet do bucket 'bronze': {e}")


Arquivo Parquet carregado com sucesso do bucket 'bronze'!
+---+---+----+------+------+----------------+-------+------+------+
|id |age|male|weight|height|bmi             |case_id|futime|status|
+---+---+----+------+------+----------------+-------+------+------+
|id |age|male|weight|height|bmi             |case.id|futime|status|
|1  |57 |0   |60    |163   |22.6909394821587|10630  |6261  |0     |
|2  |67 |0   |70.4  |168   |24.8840277061027|14817  |624   |0     |
|3  |53 |1   |105.8 |186   |30.4535370503297|3      |1783  |0     |
|4  |56 |1   |109.3 |170   |37.8300998173156|6628   |3143  |0     |
+---+---+----+------+------+----------------+-------+------+------+
only showing top 5 rows

root
 |-- id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- male: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- height: string (nullable = true)
 |-- bmi: string (nullable = true)
 |-- case_id: string (nullable = true)
 |-- futime: string (nullable = true)
 |-- st