# Feature Engineering Pipeline com Apache Airflow

## Introdução

Bem-vindos a este tutorial sobre Feature Engineering e Apache Airflow! Neste notebook, vamos explorar como construir um pipeline de engenharia de features utilizando dados climáticos como exemplo prático.

### Objetivos de Aprendizado

Ao final deste tutorial, você será capaz de:
- Compreender os conceitos fundamentais de feature engineering
- Aplicar técnicas de feature engineering usando scikit-learn
- Entender como estruturar um pipeline ETL com Apache Airflow
- Implementar transformações de dados como polynomial features e one-hot encoding
- Conhecer o básico sobre feature stores com Feast (opcional)

### Estrutura do Tutorial

1. **Conceitos Básicos de Feature Engineering**
2. **Técnicas de Feature Engineering com scikit-learn**
3. **Implementação de um Pipeline ETL com Apache Airflow**
4. **Desafio: Integração com Feast Feature Store**

Vamos começar!

## 1. Conceitos Básicos de Feature Engineering

### O que é Feature Engineering?

Feature Engineering é o processo de transformar dados brutos em features (características) que representam melhor o problema subjacente para os algoritmos de machine learning, resultando em melhor desempenho do modelo.

É considerada uma das etapas mais importantes e trabalhosas no desenvolvimento de modelos de machine learning, pois a qualidade das features tem impacto direto na qualidade do modelo final.

### Por que Feature Engineering é importante?

1. **Melhora o desempenho do modelo**: Features bem projetadas podem capturar relações importantes nos dados que algoritmos simples podem não detectar.

2. **Reduz a complexidade do modelo**: Com features melhores, você pode usar modelos mais simples e ainda obter bons resultados.

3. **Incorpora conhecimento de domínio**: Permite que especialistas no assunto contribuam com seu conhecimento para o processo de machine learning.

4. **Lida com dados problemáticos**: Ajuda a tratar valores ausentes, outliers e outras anomalias nos dados.

### Processo de Feature Engineering

O processo de feature engineering geralmente envolve as seguintes etapas:

1. **Exploração e Compreensão dos Dados**: Entender a natureza dos dados e identificar padrões.

2. **Limpeza de Dados**: Tratar valores ausentes, outliers e erros.

3. **Transformação de Features**: Aplicar operações matemáticas para criar novas features (ex: polinomiais).

4. **Codificação de Variáveis Categóricas**: Converter variáveis categóricas em numéricas (ex: one-hot encoding).

5. **Seleção de Features**: Escolher as features mais relevantes para o modelo.

6. **Normalização/Padronização**: Ajustar a escala das features para melhorar o desempenho do modelo.

No nosso exemplo com dados climáticos, vamos aplicar várias dessas técnicas para criar um conjunto de features útil para análise e modelagem.

## 2. Técnicas de Feature Engineering com scikit-learn

Vamos explorar algumas das técnicas de feature engineering mais comuns usando a biblioteca scikit-learn. Para nosso exemplo, usaremos dados climáticos da API OpenWeatherMap.

### Configuração Inicial

Primeiro, vamos importar as bibliotecas necessárias:

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import requests
import json
from sklearn.preprocessing import PolynomialFeatures, OneHotEncoder, StandardScaler
from sklearn.feature_selection import VarianceThreshold

# Configurações de visualização
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette('viridis')
%matplotlib inline

### Obtenção dos Dados Climáticos

Para este tutorial, vamos simular a obtenção de dados da API OpenWeatherMap. Em um ambiente real, você precisaria de uma chave de API válida.

In [None]:
# Simulação de dados climáticos para várias cidades
# Em um ambiente real, você faria uma chamada à API OpenWeatherMap

def simulate_weather_data(cities):
    """Simula dados climáticos para uma lista de cidades."""
    np.random.seed(42)  # Para reprodutibilidade
    weather_conditions = ['Clear', 'Clouds', 'Rain', 'Snow', 'Thunderstorm', 'Drizzle', 'Mist']
    
    data = []
    for city in cities:
        for day in range(10):  # Simula 10 dias de dados
            data.append({
                'city': city,
                'date': pd.Timestamp('2023-10-01') + pd.Timedelta(days=day),
                'temp': np.random.uniform(5, 35),  # Temperatura em Celsius
                'feels_like': np.random.uniform(3, 37),  # Sensação térmica
                'temp_min': np.random.uniform(2, 30),  # Temperatura mínima
                'temp_max': np.random.uniform(10, 40),  # Temperatura máxima
                'pressure': np.random.uniform(990, 1030),  # Pressão atmosférica
                'humidity': np.random.uniform(30, 100),  # Umidade
                'wind_speed': np.random.uniform(0, 20),  # Velocidade do vento
                'wind_deg': np.random.uniform(0, 360),  # Direção do vento
                'clouds': np.random.uniform(0, 100),  # Cobertura de nuvens
                'weather_condition': np.random.choice(weather_conditions)  # Condição climática
            })
    
    return pd.DataFrame(data)

# Lista de cidades para simular dados
cities = ['New York', 'London', 'Tokyo', 'Sydney', 'Rio de Janeiro', 'Paris', 'Moscow', 'Cairo']

# Gerar dados simulados
weather_df = simulate_weather_data(cities)

# Exibir as primeiras linhas do DataFrame
weather_df.head()

### Explorando os Dados

Antes de aplicar técnicas de feature engineering, é importante entender os dados com os quais estamos trabalhando.

In [None]:
# Informações básicas sobre o DataFrame
print("Informações do DataFrame:")
weather_df.info()

print("\nEstatísticas descritivas:")
weather_df.describe()

In [None]:
# Visualizar a distribuição de algumas variáveis numéricas
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

sns.histplot(weather_df['temp'], kde=True, ax=axes[0, 0])
axes[0, 0].set_title('Distribuição de Temperatura')

sns.histplot(weather_df['humidity'], kde=True, ax=axes[0, 1])
axes[0, 1].set_title('Distribuição de Umidade')

sns.histplot(weather_df['wind_speed'], kde=True, ax=axes[1, 0])
axes[1, 0].set_title('Distribuição de Velocidade do Vento')

sns.histplot(weather_df['pressure'], kde=True, ax=axes[1, 1])
axes[1, 1].set_title('Distribuição de Pressão Atmosférica')

plt.tight_layout()
plt.show()

In [None]:
# Visualizar a contagem de condições climáticas
plt.figure(figsize=(10, 6))
sns.countplot(y='weather_condition', data=weather_df)
plt.title('Contagem de Condições Climáticas')
plt.tight_layout()
plt.show()

### Criação de Features Relacionadas ao Tempo

Vamos extrair informações temporais da coluna de data, como dia da semana, mês, estação do ano, etc.

In [None]:
# Extrair features temporais
weather_df['day_of_week'] = weather_df['date'].dt.dayofweek
weather_df['is_weekend'] = weather_df['day_of_week'].apply(lambda x: 1 if x >= 5 else 0)  # 5=Sábado, 6=Domingo
weather_df['month'] = weather_df['date'].dt.month
weather_df['day'] = weather_df['date'].dt.day

# Definir estação do ano (simplificado para hemisfério norte)
def get_season(month):
    if month in [12, 1, 2]:
        return 'Winter'
    elif month in [3, 4, 5]:
        return 'Spring'
    elif month in [6, 7, 8]:
        return 'Summer'
    else:  # 9, 10, 11
        return 'Fall'

weather_df['season'] = weather_df['month'].apply(get_season)

# Exibir as novas colunas
weather_df[['date', 'day_of_week', 'is_weekend', 'month', 'day', 'season']].head()

### Aplicando Polynomial Features

As polynomial features são úteis para capturar relações não-lineares nos dados. Vamos aplicá-las às variáveis numéricas relacionadas ao clima.

In [None]:
# Selecionar colunas numéricas para aplicar polynomial features
numerical_cols = ['temp', 'humidity', 'wind_speed', 'pressure']
X_numerical = weather_df[numerical_cols]

# Aplicar polynomial features (grau 2)
poly = PolynomialFeatures(degree=2, include_bias=False)
poly_features = poly.fit_transform(X_numerical)

# Obter os nomes das features polinomiais
poly_feature_names = poly.get_feature_names_out(numerical_cols)

# Criar DataFrame com as features polinomiais
poly_df = pd.DataFrame(poly_features, columns=poly_feature_names)

# Exibir as primeiras linhas do DataFrame com features polinomiais
poly_df.head()

### Aplicando One-Hot Encoding

O one-hot encoding é usado para converter variáveis categóricas em formato numérico, criando uma coluna binária para cada categoria.

In [None]:
# Selecionar colunas categóricas para aplicar one-hot encoding
categorical_cols = ['weather_condition', 'season', 'city']
X_categorical = weather_df[categorical_cols]

# Aplicar one-hot encoding
encoder = OneHotEncoder(sparse=False, drop='first')  # drop='first' para evitar multicolinearidade
encoded_features = encoder.fit_transform(X_categorical)

# Obter os nomes das features codificadas
encoded_feature_names = encoder.get_feature_names_out(categorical_cols)

# Criar DataFrame com as features codificadas
encoded_df = pd.DataFrame(encoded_features, columns=encoded_feature_names)

# Exibir as primeiras linhas do DataFrame com features codificadas
encoded_df.head()

### Criando Features de Interação

As features de interação podem capturar relações importantes entre diferentes variáveis.

In [None]:
# Criar algumas features de interação manualmente
weather_df['temp_humidity_ratio'] = weather_df['temp'] / weather_df['humidity']
weather_df['heat_index'] = weather_df['temp'] * (1 + 0.01 * weather_df['humidity'])  # Simplificação do índice de calor
weather_df['wind_chill'] = 13.12 + 0.6215 * weather_df['temp'] - 11.37 * (weather_df['wind_speed'] ** 0.16) + 0.3965 * weather_df['temp'] * (weather_df['wind_speed'] ** 0.16)  # Fórmula simplificada

# Exibir as novas features
weather_df[['temp', 'humidity', 'wind_speed', 'temp_humidity_ratio', 'heat_index', 'wind_chill']].head()

### Combinando Todas as Features

Agora vamos combinar todas as features que criamos em um único DataFrame.

In [None]:
# Selecionar features originais que queremos manter
original_features = ['city', 'date', 'temp', 'humidity', 'wind_speed', 'pressure', 'weather_condition']
original_df = weather_df[original_features].copy()

# Selecionar features temporais
time_features = ['day_of_week', 'is_weekend', 'month', 'day', 'season']
time_df = weather_df[time_features].copy()

# Selecionar features de interação
interaction_features = ['temp_humidity_ratio', 'heat_index', 'wind_chill']
interaction_df = weather_df[interaction_features].copy()

# Combinar todos os DataFrames
# Nota: Não incluímos as features polinomiais e one-hot encoding diretamente para evitar um DataFrame muito grande
combined_df = pd.concat([original_df, time_df, interaction_df], axis=1)

# Exibir as primeiras linhas do DataFrame combinado
combined_df.head()

### Seleção de Features

A seleção de features é importante para reduzir a dimensionalidade e melhorar o desempenho do modelo. Vamos usar o método de limiar de variância para selecionar features.

In [None]:
# Preparar dados para seleção de features
# Primeiro, precisamos converter todas as colunas para numéricas
# Vamos usar apenas as colunas numéricas e as que já codificamos

# Selecionar colunas numéricas do DataFrame combinado
numeric_columns = combined_df.select_dtypes(include=['number']).columns
X_for_selection = combined_df[numeric_columns].copy()

# Normalizar os dados para que a variância seja comparável
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_for_selection)
X_scaled_df = pd.DataFrame(X_scaled, columns=X_for_selection.columns)

# Aplicar seleção por limiar de variância
selector = VarianceThreshold(threshold=0.01)  # Remover features com variância < 0.01
X_selected = selector.fit_transform(X_scaled_df)

# Obter os nomes das features selecionadas
selected_features = X_for_selection.columns[selector.get_support()]
print(f"Features originais: {len(X_for_selection.columns)}")
print(f"Features selecionadas: {len(selected_features)}")
print("\nFeatures selecionadas:")
print(selected_features.tolist())

In [None]:
# Criar DataFrame final com as features selecionadas
final_df = combined_df[['city', 'date']].copy()  # Manter colunas de identificação
for feature in selected_features:
    if feature in combined_df.columns:
        final_df[feature] = combined_df[feature]

# Exibir as primeiras linhas do DataFrame final
final_df.head()

## 3. Implementação de um Pipeline ETL com Apache Airflow

Agora que entendemos as técnicas de feature engineering, vamos ver como implementar um pipeline ETL completo usando Apache Airflow.

### O que é Apache Airflow?

Apache Airflow é uma plataforma de código aberto para criar, agendar e monitorar fluxos de trabalho programaticamente. Com o Airflow, você pode definir seus pipelines de dados como código, tornando-os mais fáceis de manter, versionar e escalar.

### Componentes Principais do Airflow

- **DAG (Directed Acyclic Graph)**: Representa um fluxo de trabalho como um grafo acíclico direcionado de tarefas.
- **Operators**: Definem o que realmente é executado em cada tarefa (ex: PythonOperator, BashOperator).
- **Tasks**: Instâncias parametrizadas de operadores.
- **Task Dependencies**: Definem a ordem de execução das tarefas.

### Estrutura do nosso Pipeline ETL

Nosso pipeline ETL para feature engineering seguirá estas etapas:

1. **Extract**: Obter dados climáticos da API OpenWeatherMap.
2. **Transform**: Aplicar técnicas de feature engineering aos dados.
3. **Load**: Armazenar os resultados em um banco de dados MongoDB.

Vamos examinar o código do DAG que implementa este pipeline:

In [None]:
# Este é o código do DAG do Airflow
# Em um ambiente Jupyter, não podemos executar diretamente o Airflow
# Este código é apenas para fins educacionais


import json
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.models import Variable
from datetime import datetime, timedelta
from sklearn.preprocessing import PolynomialFeatures, OneHotEncoder
from sklearn.feature_selection import VarianceThreshold
import numpy as np
from pymongo import MongoClient

# Define functions outside the DAG block

def parse_weather_response(**context):
    # Parse the HTTP response into a DataFrame.
    response = context["task_instance"].xcom_pull(task_ids="fetch_weather")
    data = json.loads(response)
    weather_data = {
        "temp": data["main"]["temp"],
        "humidity": data["main"]["humidity"],
        "wind_speed": data["wind"]["speed"],
        "weather_condition": data["weather"][0]["main"]
    }
    df = pd.DataFrame([weather_data])
    context["task_instance"].xcom_push(key="weather_df", value=df.to_json())

def apply_polynomial_features(**context):
    # Apply polynomial features to numerical columns.
    df_json = context["task_instance"].xcom_pull(task_ids="parse_weather", key="weather_df")
    df = pd.read_json(df_json)
    poly_degree = context["dag_run"].conf.get("poly_degree", 2)
    numerical_cols = ["temp", "humidity", "wind_speed"]
    poly = PolynomialFeatures(degree=poly_degree, include_bias=False)
    poly_features = poly.fit_transform(df[numerical_cols])
    feature_names = poly.get_feature_names_out(numerical_cols)
    df_poly = pd.DataFrame(poly_features, columns=feature_names)
    context["task_instance"].xcom_push(key="poly_features", value=df_poly.to_json())

def apply_one_hot_encoding(**context):
    # Apply one-hot encoding to categorical columns. 
    df_json = context["task_instance"].xcom_pull(task_ids="parse_weather", key="weather_df")
    df = pd.read_json(df_json)
    categorical_cols = ["weather_condition"]
    encoder = OneHotEncoder(sparse=False, drop="first")
    encoded_features = encoder.fit_transform(df[categorical_cols])
    feature_names = encoder.get_feature_names_out(categorical_cols)
    df_encoded = pd.DataFrame(encoded_features, columns=feature_names)
    context["task_instance"].xcom_push(key="encoded_features", value=df_encoded.to_json())

def combine_features(**context):
    # Combine polynomial and encoded features into a single DataFrame.
    poly_json = context["task_instance"].xcom_pull(task_ids="apply_polynomial", key="poly_features")
    encoded_json = context["task_instance"].xcom_pull(task_ids="apply_encoding", key="encoded_features")
    df_poly = pd.read_json(poly_json)
    df_encoded = pd.read_json(encoded_json)
    df_combined = pd.concat([df_poly, df_encoded], axis=1)
    context["task_instance"].xcom_push(key="combined_features", value=df_combined.to_json())

def feature_selection(**context):
    # Perform variance-based feature selection.
    df_json = context["task_instance"].xcom_pull(task_ids="combine_features", key="combined_features")
    df = pd.read_json(df_json)
    selector = VarianceThreshold(threshold=0.01)  # Remove features with variance < 0.01
    selected_features = selector.fit_transform(df)
    feature_names = df.columns[selector.get_support()]
    df_selected = pd.DataFrame(selected_features, columns=feature_names)
    context["task_instance"].xcom_push(key="selected_features", value=df_selected.to_json())

def load_to_mongodb(**context):
    # Load the transformed data into MongoDB.
    df_json = context["task_instance"].xcom_pull(task_ids="feature_selection", key="selected_features")
    df = pd.read_json(df_json)
    client = MongoClient("mongodb://localhost:27017/")  # Update if using MongoDB Atlas
    db = client["weather_db"]
    collection = db["daily_weather"]
    collection.insert_many(df.to_dict("records"))
    
    # Optional: For advanced users, you could integrate Feast (a feature store) here.
    # Example (commented out):
    # from feast import Client
    # feast_client = Client(core_url="your_feast_core_url")
    # feast_client.apply_entity("location", ...)
    # feast_client.apply_feature_view(weather_features, ...)
    # feast_client.ingest("weather_features", df)

# Define the DAG and operators inside the "with" block
with DAG(
    "daily_weather_etl",
    default_args={
        "owner": "airflow",
        "start_date": datetime(2023, 10, 1),
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
    },
    schedule_interval="@daily",
    catchup=False,
    params={"city": "New York", "poly_degree": 2}  # Default parameters
) as dag:

    # Task 1: Fetch weather data from OpenWeatherMap API
    fetch_weather = SimpleHttpOperator(
        task_id="fetch_weather",
        http_conn_id="openweather_conn",  # Connection set in Airflow UI
        endpoint="data/2.5/weather?q={{ dag_run.conf[\'city\'] }}&appid={{ var.value.weather_api_key }}&units=metric",
        method="GET",
        xcom_push=True,  # Push response to XCom
    )

    # Task 2: Parse the weather response
    parse_task = PythonOperator(
        task_id="parse_weather",
        python_callable=parse_weather_response,
        provide_context=True,
    )

    # Task 3: Apply polynomial features
    poly_task = PythonOperator(
        task_id="apply_polynomial",
        python_callable=apply_polynomial_features,
        provide_context=True,
    )

    # Task 4: Apply one-hot encoding
    encode_task = PythonOperator(
        task_id="apply_encoding",
        python_callable=apply_one_hot_encoding,
        provide_context=True,
    )

    # Task 5: Combine features
    combine_task = PythonOperator(
        task_id="combine_features",
        python_callable=combine_features,
        provide_context=True,
    )

    # Task 6: Perform feature selection
    selection_task = PythonOperator(
        task_id="feature_selection",
        python_callable=feature_selection,
        provide_context=True,
    )

    # Task 7: Load data into MongoDB
    load_task = PythonOperator(
        task_id="load_to_db",
        python_callable=load_to_mongodb,
        provide_context=True,
    )

    # Set task dependencies
    fetch_weather >> parse_task
    parse_task >> [poly_task, encode_task]  # Parallel execution
    [poly_task, encode_task] >> combine_task
    combine_task >> selection_task >> load_task


### Explicação do Pipeline ETL

Vamos analisar o pipeline ETL implementado no código acima:

#### 1. Extração (Extract)
- Utilizamos o `SimpleHttpOperator` para fazer uma requisição à API OpenWeatherMap.
- Os parâmetros da requisição (cidade, chave de API) são configurados no Airflow.
- A resposta da API é armazenada usando o sistema XCom do Airflow para compartilhamento entre tarefas.

#### 2. Transformação (Transform)
- **Parsing**: Convertemos a resposta JSON em um DataFrame pandas.
- **Feature Engineering**:
  - Aplicamos polynomial features às variáveis numéricas.
  - Aplicamos one-hot encoding às variáveis categóricas.
  - Combinamos as features transformadas.
  - Realizamos seleção de features baseada em variância.

#### 3. Carregamento (Load)
- Armazenamos as features processadas em um banco de dados MongoDB.
- Opcionalmente, poderíamos integrar com o Feast para gerenciamento de features.

### Fluxo de Execução do DAG

O fluxo de execução do DAG é definido pelas dependências entre as tarefas:

```
fetch_weather >> parse_task
parse_task >> [poly_task, encode_task]  # Execução paralela
[poly_task, encode_task] >> combine_task
combine_task >> selection_task >> load_task
```

Este fluxo pode ser visualizado como:

```
                                  ┌─────────────┐
                                  │ parse_task  │
                                  └──────┬──────┘
                                         │
                                         ▼
┌──────────────┐                ┌────────────────┐                ┌────────────────┐
│ fetch_weather ├───────────────► poly_task      ├────────────────►                │
└──────────────┘                └────────────────┘                │                │
                                                                  │ combine_task   ├───► selection_task ───► load_task
                                ┌────────────────┐                │                │
                                │ encode_task    ├────────────────►                │
                                └────────────────┘                └────────────────┘
```

### Vantagens do Apache Airflow para Feature Engineering

1. **Automação**: Executa o pipeline automaticamente conforme o agendamento definido.
2. **Monitoramento**: Fornece uma interface visual para monitorar a execução do pipeline.
3. **Escalabilidade**: Pode lidar com grandes volumes de dados e pipelines complexos.
4. **Flexibilidade**: Permite definir fluxos de trabalho complexos com execução paralela e dependências.
5. **Recuperação de Falhas**: Oferece mecanismos para lidar com falhas e retentativas.
6. **Versionamento**: O pipeline é definido como código, facilitando o versionamento e a colaboração.

## 4. Desafio: Integração com Feast Feature Store

Como desafio opcional, vamos explorar como integrar nosso pipeline de feature engineering com o Feast, um feature store de código aberto.

### O que é um Feature Store?

Um feature store é um sistema especializado para armazenar, gerenciar e servir features para modelos de machine learning. Ele resolve problemas comuns em pipelines de ML, como:

1. **Consistência entre treinamento e inferência**: Garante que as mesmas transformações sejam aplicadas aos dados de treinamento e produção.
2. **Reutilização de features**: Permite que features sejam compartilhadas entre diferentes modelos e equipes.
3. **Monitoramento**: Facilita o monitoramento de drift e qualidade das features.
4. **Governança**: Fornece linhagem e documentação para features.

### Introdução ao Feast

Feast (Feature Store) é uma ferramenta de código aberto para gerenciar, servir e descobrir features para machine learning. Ele fornece:

- Um registro centralizado de definições de features
- Armazenamento online e offline para features
- APIs para recuperação de features para treinamento e inferência
- Integração com várias fontes de dados e plataformas de ML

### Como Integrar Feast ao Nosso Pipeline

Vamos ver como poderíamos integrar o Feast ao nosso pipeline de feature engineering:

In [None]:
# Este é um exemplo conceitual de como integrar Feast ao nosso pipeline
# Em um ambiente real, você precisaria instalar e configurar o Feast


# Primeiro, definimos as entidades e features no Feast
# Arquivo: feature_repo/feature_definitions.py

from datetime import timedelta
from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import FileSource

# Define a entidade para nossos dados climáticos
location = Entity(name="location", value_type=ValueType.STRING, description="Localização da medição climática")

# Define a fonte de dados para as features climáticas
weather_source = FileSource(
    path="/path/to/weather_features.parquet",  # Caminho para o arquivo de features
    event_timestamp_column="date",
)

# Define a view de features para os dados climáticos
weather_features_view = FeatureView(
    name="weather_features",
    entities=[location],
    ttl=timedelta(days=3),  # Time-to-live para as features
    features=[
        Feature(name="temp", dtype=ValueType.FLOAT),
        Feature(name="humidity", dtype=ValueType.FLOAT),
        Feature(name="wind_speed", dtype=ValueType.FLOAT),
        Feature(name="temp_squared", dtype=ValueType.FLOAT),
        Feature(name="humidity_squared", dtype=ValueType.FLOAT),
        Feature(name="temp_humidity", dtype=ValueType.FLOAT),
        Feature(name="is_rainy", dtype=ValueType.INT32),
        Feature(name="is_sunny", dtype=ValueType.INT32),
    ],
    online=True,
    input=weather_source,
    tags={"team": "weather_analytics"},
)

# Agora, modificamos a função load_to_mongodb no nosso DAG para incluir a integração com Feast

def load_to_feast(**context):
    # Load the transformed data into Feast feature store.
    from feast import Client
    import pandas as pd
    from datetime import datetime
    
    # Obter as features processadas
    df_json = context["task_instance"].xcom_pull(task_ids="feature_selection", key="selected_features")
    df = pd.read_json(df_json)
    
    # Adicionar colunas necessárias para o Feast
    df["location"] = context["dag_run"].conf.get("city", "New York")  # Entidade
    df["date"] = datetime.now()  # Timestamp do evento
    
    # Salvar o DataFrame em um formato que o Feast possa ler
    parquet_path = "/path/to/weather_features.parquet"
    df.to_parquet(parquet_path)
    
    # Conectar ao Feast e aplicar as definições de features
    feast_client = Client(repo_path="/path/to/feature_repo")
    feast_client.apply()  # Aplica as definições de features
    
    # Ingerir os dados no feature store
    feast_client.materialize_incremental(datetime.now() - timedelta(days=1), datetime.now())
    
    return "Features loaded to Feast successfully"

# Adicionar a tarefa ao DAG
load_feast_task = PythonOperator(
    task_id="load_to_feast",
    python_callable=load_to_feast,
    provide_context=True,
)

# Modificar as dependências
selection_task >> load_feast_task

### Benefícios da Integração com Feast

Integrar nosso pipeline de feature engineering com o Feast traz vários benefícios:

1. **Consistência**: Garante que as mesmas transformações sejam aplicadas aos dados de treinamento e produção.
2. **Reutilização**: Permite que as features sejam facilmente reutilizadas em diferentes modelos.
3. **Escalabilidade**: Facilita o gerenciamento de features em projetos de grande escala.
4. **Monitoramento**: Fornece ferramentas para monitorar a qualidade e o drift das features.
5. **Documentação**: Mantém um registro centralizado de metadados sobre as features.

### Desafio para os Alunos

Como desafio, tente implementar a integração com Feast seguindo estes passos:

1. Instale o Feast usando `pip install feast`.
2. Crie um repositório de features com `feast init feature_repo`.
3. Defina suas entidades e features no arquivo `feature_definitions.py`.
4. Modifique o pipeline Airflow para incluir a integração com Feast.
5. Teste a recuperação de features para treinamento e inferência.

Este desafio é uma excelente oportunidade para explorar ferramentas avançadas de MLOps e entender como o feature engineering se encaixa em um fluxo de trabalho de machine learning de produção.

## Conclusão

Neste tutorial, exploramos o processo completo de feature engineering, desde a compreensão dos conceitos básicos até a implementação de um pipeline ETL automatizado com Apache Airflow.

### Recapitulando o que aprendemos:

1. **Conceitos de Feature Engineering**:
   - Importância da feature engineering para modelos de machine learning
   - Processo de transformação de dados brutos em features úteis

2. **Técnicas de Feature Engineering com scikit-learn**:
   - Criação de features temporais
   - Aplicação de polynomial features
   - One-hot encoding para variáveis categóricas
   - Criação de features de interação
   - Seleção de features baseada em variância

3. **Implementação de Pipeline ETL com Apache Airflow**:
   - Estrutura de um DAG no Airflow
   - Extração de dados de APIs
   - Transformação de dados com técnicas de feature engineering
   - Carregamento de dados em bancos de dados

4. **Integração com Feast Feature Store (Desafio)**:
   - Conceito de feature store
   - Benefícios da utilização de um feature store
   - Implementação básica com Feast

### Próximos Passos

Para continuar seu aprendizado em feature engineering e pipelines de dados:

1. **Experimente com diferentes técnicas de feature engineering**:
   - Transformações não-lineares
   - Técnicas de redução de dimensionalidade (PCA, t-SNE)
   - Métodos de seleção de features baseados em modelos

2. **Explore recursos avançados do Apache Airflow**:
   - Sensores e gatilhos
   - Branching e condicionais
   - Paralelismo e pools

3. **Aprofunde-se em MLOps**:
   - Implementação completa com Feast
   - Monitoramento de modelos e features
   - Integração com plataformas de ML

Lembre-se: a feature engineering é tanto uma ciência quanto uma arte. A prática e a experimentação são essenciais para desenvolver boas intuições sobre quais transformações funcionarão melhor para seus dados específicos.

## Exercício Prático

Para consolidar o conhecimento adquirido, tente o seguinte exercício:

1. Expanda o conjunto de dados climáticos simulado para incluir mais variáveis (ex: pressão atmosférica, visibilidade, ponto de orvalho).
2. Crie novas features baseadas em conhecimento de domínio sobre meteorologia.
3. Implemente técnicas adicionais de feature engineering não abordadas no tutorial.
4. Avalie a importância das features criadas usando um modelo simples (ex: RandomForest).
5. Bônus: Tente implementar a integração com Feast conforme descrito no desafio.

Compartilhe seus resultados e insights com a turma!