In [1]:
import pandas as pd
import numpy as np
import duckdb
from sklearn.linear_model import LinearRegression

In [2]:
# Caminho do banco
db_path = "../../data/duckdb/database.duckdb"

# Conexão com o banco DuckDB
con = duckdb.connect(db_path)


# Carrega os dados da camada bronze
df = con.execute("""
    SELECT 
          CG.*
        , CC.cluster
                 
    FROM gold.consumo_geral AS CG
    
    INNER JOIN output.clusterizacao_cliente AS CC ON
        CG.client_id = CC.client_id
""").df()

# Feature Engineering

In [3]:
def gerar_lag_features(df: pd.DataFrame, coluna_alvo: str, num_lags: int) -> pd.DataFrame:
    """
    Gera lag features para a coluna alvo em um DataFrame.

    Parâmetros:
    df (pd.DataFrame): DataFrame original.
    coluna_alvo (str): Nome da coluna alvo para gerar os lags.
    num_lags (int): Quantidade de lags a serem geradas (ex: 12 gera lag_1 até lag_12).

    Retorna:
    pd.DataFrame: DataFrame com as novas colunas de lag adicionadas.
    """
    df_copy = df.copy()
    for lag in range(1, num_lags + 1):
        df_copy[f'{coluna_alvo}_lag_{lag}'] = df_copy[coluna_alvo].shift(lag)
    return df_copy

def criar_features_climaticas(df):
    """
    Cria colunas de amplitude térmica, faixa de temperatura e faixa de umidade em um DataFrame.

    Espera que o DataFrame tenha colunas:
        - temperature_max
        - temperature_min
        - temperature
        - humidity
    """
    df = df.copy()

    df['temperature_max'] = df['temperature'].max()
    df['temperature_min'] = df['temperature'].min()

    # Amplitude térmica
    df['amplitude_termica'] = df['temperature_max'] - df['temperature_min']

    # Faixa de temperatura média
    def classificar_temp(temp):
        if temp < 24.356250:
            return 'baixa'
        elif temp <= 25.813333:
            return 'media'
        else:
            return 'alta'

    df['faixa_temperatura'] = df['temperature'].apply(classificar_temp)

    # Faixa de umidade
    def classificar_umidade(h):
        if h < 58.623529:
            return 'baixa'
        elif h <= 61.540625:
            return 'media'
        else:
            return 'alta'

    df['faixa_umidade'] = df['humidity'].apply(classificar_umidade)

    return df

def criar_features_clientes(df):
    """
    Espera DataFrame com colunas: client_id, date (datetime) e consumption_kwh.
    Retorna um novo DataFrame com client_id, variabilidade_consumo_cliente e tendencia_consumo_cliente.
    """
    df = df.copy()
    df['date_ordinal'] = df['date'].apply(lambda x: x.toordinal())  # Facilita a regressão linear
    
    resultados = []

    for client, grupo in df.groupby('client_id'):
        # Variabilidade do consumo
        variabilidade = grupo['consumption_kwh'].std()

        # Tendência (regressão linear)
        if len(grupo) >= 2:
            X = grupo[['date_ordinal']]
            y = grupo['consumption_kwh']
            modelo = LinearRegression().fit(X, y)
            tendencia = modelo.coef_[0]  # Inclinação da reta
        else:
            tendencia = np.nan

        resultados.append({
            'client_id': client,
            'variabilidade_consumo_cliente': variabilidade,
            'tendencia_consumo_cliente': tendencia
        })

    return pd.DataFrame(resultados)


In [4]:
df = criar_features_climaticas(df)

# 1. Geração das features por cliente
df_features_clientes = criar_features_clientes(df)

# 2. Merge com o dataframe original de consumo
df = df.merge(
    df_features_clientes,
    on='client_id',
    how='left'
)

In [5]:
faixa_temperatura = [f'temperatura_{temp}' for temp in df['faixa_temperatura'].unique().tolist()]

# Codificação da variável categórica
df = pd.get_dummies(df, columns=['faixa_temperatura'], prefix='temperatura', prefix_sep='_')

df[faixa_temperatura] = df[faixa_temperatura].astype(int)

faixa_umidade = [f'umidade_{temp}' for temp in df['faixa_umidade'].unique().tolist()]

# Codificação da variável categórica
df = pd.get_dummies(df, columns=['faixa_umidade'], prefix='umidade', prefix_sep='_')

df[faixa_umidade] = df[faixa_umidade].astype(int)

In [6]:
df = gerar_lag_features(df, 'temperature', 14)
df = gerar_lag_features(df, 'humidity', 14)
df = gerar_lag_features(df, 'consumption_kwh', 14)

df = df.fillna(0)

# Salvar Features

In [7]:
def gerar_ddl_duckdb(df: pd.DataFrame, nome_tabela: str, schema: str = "feature") -> str:
    """
    Gera uma string SQL para criar uma tabela no DuckDB com base em um DataFrame Pandas.

    Parâmetros:
        df (pd.DataFrame): DataFrame de entrada.
        nome_tabela (str): Nome da tabela a ser criada.
        schema (str): Nome do schema (default = "feature").

    Retorna:
        str: Comando CREATE OR REPLACE TABLE no formato DuckDB.
    """
    tipo_duckdb = {
        "int64": "BIGINT",
        "int32": "INTEGER",
        "float64": "DOUBLE",
        "float32": "FLOAT",
        "object": "VARCHAR",
        "bool": "BOOLEAN",
        "datetime64[ns]": "TIMESTAMP"
    }

    colunas_sql = []

    for col in df.columns:
        tipo_pandas = str(df[col].dtype)
        tipo_sql = tipo_duckdb.get(tipo_pandas, "VARCHAR")  # default: VARCHAR
        colunas_sql.append(f"    {col} {tipo_sql}")

    colunas_str = ",\n".join(colunas_sql)

    ddl = f"""CREATE OR REPLACE TABLE {schema}.{nome_tabela} (\n{colunas_str}\n);"""
    return ddl


In [8]:
ddl_sql = gerar_ddl_duckdb(df, nome_tabela="previsao_consumo", schema="feature")

In [9]:
con.execute(ddl_sql)

<duckdb.duckdb.DuckDBPyConnection at 0x1f38eae8f70>

In [10]:
# Limpa dados se as tabelas já existirem
con.execute("DELETE FROM feature.previsao_consumo")

<duckdb.duckdb.DuckDBPyConnection at 0x1f38eae8f70>

In [11]:
# Registra como tabelas temporárias
con.register("df", df)

<duckdb.duckdb.DuckDBPyConnection at 0x1f38eae8f70>

In [12]:
# Insere os dados nas tabelas gold
con.execute("INSERT INTO feature.previsao_consumo SELECT * FROM df")

<duckdb.duckdb.DuckDBPyConnection at 0x1f38eae8f70>

In [13]:
con.close()