# Desafio

## Enunciado

🧩 Desafio: ETL de Dados Públicos

🎯 Objetivo Construir um pipeline ETL no Apache Airflow que:

Extrai dados de algum dataset público

Transforma os dados limpando, filtrando e unificando informações.

Carrega o resultado em um banco de dados PostgreSQL (ou salva como Parquet).

Agenda a execução diária do pipeline (simulando ingestão incremental).

### Imports

In [1]:
import mlflow
import kagglehub
import os

import pandas as pd

from sklearn.preprocessing import MinMaxScaler, StandardScaler

  from .autonotebook import tqdm as notebook_tqdm


### Constants

In [2]:
KAGGLE_HUB_DATASET="ahmadrazakashif/bmw-worldwide-sales-records-20102024"
MLFLOW_EXPERIMENT_NAME="etl_public_dataset"
MLFLOW_RUN_NAME="jupyter_run"

### Configurar e iniciar o mlflow

In [3]:
mlflow.set_experiment(MLFLOW_EXPERIMENT_NAME)
mlflow.start_run(run_name=MLFLOW_RUN_NAME)


<ActiveRun: >

### Utilitários do mlflow

In [4]:
artifacts_path = "artifacts"
os.makedirs(artifacts_path, exist_ok=True)  # cria se não existir

def log_artifact(df: pd.DataFrame, name: str):
    path = os.path.join(artifacts_path, f"{name}.parquet")
    df.to_parquet(path, index=False)
    mlflow.log_artifact(path, artifact_path="data")

### Baixar data set

In [5]:
mlflow.log_param("dataset", KAGGLE_HUB_DATASET)
dataset_path = kagglehub.dataset_download(KAGGLE_HUB_DATASET)
print("Path do dataset:", dataset_path)
print("Arquivos do dataset:", os.listdir(dataset_path))
dataset_csv_path = os.path.join(dataset_path, os.listdir(dataset_path)[0])
print("Path do csv do dataset:", dataset_csv_path)

Path do dataset: /Users/vand/.cache/kagglehub/datasets/ahmadrazakashif/bmw-worldwide-sales-records-20102024/versions/1
Arquivos do dataset: ['BMW sales data (2010-2024) (1).csv']
Path do csv do dataset: /Users/vand/.cache/kagglehub/datasets/ahmadrazakashif/bmw-worldwide-sales-records-20102024/versions/1/BMW sales data (2010-2024) (1).csv


### Carrega dataset

In [6]:
df_raw = pd.read_csv(dataset_csv_path, engine="python")
df_raw.head()

log_artifact(df_raw, "raw")

### Infos do dataset?

In [7]:
df_raw.info()
display('Dimensões:', df_raw.shape)
display(df_raw.describe())
display("Itens nulos:", df_raw.isnull().sum().sort_values(ascending=False))

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50000 entries, 0 to 49999
Data columns (total 11 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   Model                 50000 non-null  object 
 1   Year                  50000 non-null  int64  
 2   Region                50000 non-null  object 
 3   Color                 50000 non-null  object 
 4   Fuel_Type             50000 non-null  object 
 5   Transmission          50000 non-null  object 
 6   Engine_Size_L         50000 non-null  float64
 7   Mileage_KM            50000 non-null  int64  
 8   Price_USD             50000 non-null  int64  
 9   Sales_Volume          50000 non-null  int64  
 10  Sales_Classification  50000 non-null  object 
dtypes: float64(1), int64(4), object(6)
memory usage: 4.2+ MB


'Dimensões:'

(50000, 11)

Unnamed: 0,Year,Engine_Size_L,Mileage_KM,Price_USD,Sales_Volume
count,50000.0,50000.0,50000.0,50000.0,50000.0
mean,2017.0157,3.24718,100307.20314,75034.6009,5067.51468
std,4.324459,1.009078,57941.509344,25998.248882,2856.767125
min,2010.0,1.5,3.0,30000.0,100.0
25%,2013.0,2.4,50178.0,52434.75,2588.0
50%,2017.0,3.2,100388.5,75011.5,5087.0
75%,2021.0,4.1,150630.25,97628.25,7537.25
max,2024.0,5.0,199996.0,119998.0,9999.0


'Itens nulos:'

Model                   0
Year                    0
Region                  0
Color                   0
Fuel_Type               0
Transmission            0
Engine_Size_L           0
Mileage_KM              0
Price_USD               0
Sales_Volume            0
Sales_Classification    0
dtype: int64

### Removendo duplicados

In [8]:
df_clear = df_raw.copy()

rows_before = df_clear.shape[0]
df_clear.drop_duplicates(inplace=True)
rows_after = df_clear.shape[0]
print(f'Duplicatas removidas: {rows_before - rows_after}')

Duplicatas removidas: 0


### Tratamento para nulos

In [9]:
df_clear = df_clear.dropna(subset=["Model"])
mlflow.log_param("remove-null-Model", True)

df_clear = df_clear.dropna(subset=["Year"])
mlflow.log_param("remove-null-Year", True)

df_clear = df_clear.dropna(subset=["Region"])
mlflow.log_param("remove-null-Region", True)

color_mode = df_clear["Color"].mode()[0]
mlflow.log_param("imput-Color", "mode")

fuel_type_mode = df_clear["Fuel_Type"].mode()[0]
mlflow.log_param("imput-Fuel_Type", "mode")

transmission_mode = df_clear["Transmission"].mode()[0]
mlflow.log_param("imput-Transmission", "mode")

engine_size_l_median = df_clear["Engine_Size_L"].median()
mlflow.log_param("imput-Engine_Size_L", "median")

mileage_km_median = df_clear["Mileage_KM"].median()
mlflow.log_param("imput-Mileage_KM", "median")

sales_volume_median = df_clear["Sales_Volume"].median()
mlflow.log_param("imput-Sales_Volume", "median")

sales_classification_mode = df_clear["Sales_Classification"].mode()[0]
mlflow.log_param("imput-Sales_Classification", "mode")

# Aplica os tratamentos
coluns_tretament = {
    "Color": color_mode,
    "Fuel_Type": fuel_type_mode,
    "Transmission": transmission_mode,
    "Engine_Size_L": engine_size_l_median,
    "Mileage_KM": mileage_km_median,
    "Sales_Volume": sales_volume_median,
    "Sales_Classification": sales_classification_mode
}
df_clear = df_clear.fillna(coluns_tretament)

log_artifact(df_clear, "non-null")

### Transformação de variáveis categoricas usando on hot encoding

In [10]:
columns_to_get_dummies = ["Year",
                          "Region",
                          "Color",
                          "Fuel_Type",
                          "Transmission",
                          "Sales_Classification"]
df_clear = pd.get_dummies(df_clear, columns=columns_to_get_dummies, prefix="dummie")
mlflow.log_param("insert-Year", "dummies")
mlflow.log_param("insert-Region", "dummies")
mlflow.log_param("insert-Color", "dummies")
mlflow.log_param("insert-Fuel_Type", "dummies")
mlflow.log_param("insert-Transmission", "dummies")
mlflow.log_param("insert-Sales_Classification", "dummies")

log_artifact(df_clear, "transformed_dummies")

### Normalização de variáveis numéricas

In [11]:
# StandardScaler
columns_to_scale_std = ["Engine_Size_L"]
scaled_std_values = StandardScaler().fit_transform(df_clear[columns_to_scale_std])
mlflow.log_param("normalized-Engine_Size_L", "StandardScaler")

std_scaled_columns = [f"StandardScalerScaled_{x}" for x in columns_to_scale_std]
df_std_scaled_values = pd.DataFrame(scaled_std_values, columns=std_scaled_columns, index=df_clear.index)

# MinMaxScaler
columns_to_scale_minmax = ['Mileage_KM', 'Price_USD', 'Sales_Volume']
scaled_minmax_values = MinMaxScaler().fit_transform(df_clear[columns_to_scale_minmax])

mlflow.log_param("normalized-Mileage_KM", "MinMaxScaler")
mlflow.log_param("normalized-Price_USD", "MinMaxScaler")
mlflow.log_param("normalized-Sales_Volume", "MinMaxScaler")

minmax_scaled_columns = [f"MinMaxScaled_{x}" for x in columns_to_scale_minmax]
df_minmax_scaled_values = pd.DataFrame(scaled_minmax_values, columns=minmax_scaled_columns, index=df_clear.index)

df_clear = pd.concat([df_clear, df_minmax_scaled_values, df_std_scaled_values], axis=1)

log_artifact(df_clear, "scaled")

## Armazene esses valores como um artefato dentro do MLFlow

In [12]:
processed_data_path = "artifacts/dataset_processed.csv"
df_clear.to_csv(processed_data_path, index=False)
print("Dataset processado salvo localmente.")

mlflow.log_artifact(processed_data_path)
mlflow.end_run()

Dataset processado salvo localmente.
