# Knowledge Extraction Notebook



In [1]:
import sys, subprocess, warnings
warnings.filterwarnings('ignore')
def ensure(pkg, conda=False):
    try:
        __import__(pkg)
    except Exception:
        if conda:
            try:
                subprocess.check_call(["conda","install","-c","conda-forge",pkg,"-y"])
            except Exception:
                subprocess.check_call([sys.executable,"-m","pip","install",pkg])
        else:
            subprocess.check_call([sys.executable,"-m","pip","install",pkg])
ensure('pandas'); ensure('numpy'); ensure('matplotlib'); ensure('statsmodels'); ensure('scikit_learn')
try:
    import pmdarima
except Exception:
    try: subprocess.check_call(["conda","install","-c","conda-forge","pmdarima","-y"])
    except Exception: subprocess.check_call([sys.executable,"-m","pip","install","pmdarima"])
try:
    import pygam
except Exception:
    subprocess.check_call([sys.executable,"-m","pip","install","pygam"]) 
try:
    import ruptures
except Exception:
    subprocess.check_call([sys.executable,"-m","pip","install","ruptures"]) 
print('Python exe:', sys.executable)

Python exe: c:\Users\acer\Desktop\fnn-pso\.venv\Scripts\python.exe


In [2]:
import pandas as pd, numpy as np, matplotlib.pyplot as plt
from pathlib import Path
CSV_PATH = "dataset_meteo_com_consumo.csv" 
OUT_DIR = Path("outputs_knowledge"); OUT_DIR.mkdir(exist_ok=True)
df = pd.read_csv(CSV_PATH)
df.columns = [c.strip().lower().replace(' ','_') for c in df.columns]
date_col = 'date' if 'date' in df.columns else ('data' if 'data' in df.columns else None)
assert date_col is not None, 'Coluna de data não encontrada.'
df[date_col] = pd.to_datetime(df[date_col], errors='coerce', dayfirst=True)
df = df.sort_values(date_col).reset_index(drop=True)
TARGET='consumo_gwh'
assert TARGET in df.columns, f"{TARGET} não encontrada"
meteo_candidates=['tmean_c','tmax_c','tmin_c','hdd18','cdd22','precip_mm','rad_solar','humidade_relativa','nebulosidade_media','sunshine_sec','sunshine_h','wind_speed_max','wind_gusts_max','day_length_h','day_length_hours']
if 'sunshine_sec' in df.columns and 'sunshine_h' not in df.columns:
    df['sunshine_h']=pd.to_numeric(df['sunshine_sec'],errors='coerce')/3600
meteo_cols=[c for c in meteo_candidates if c in df.columns]
pd.DataFrame({'column':df.columns,'dtype':[str(df[c].dtype) for c in df.columns]}).to_csv(OUT_DIR/'schema_columns.csv',index=False)
df[[date_col,TARGET]+meteo_cols].head()

Unnamed: 0,date,consumo_gwh,tmean_c,tmax_c,tmin_c,hdd18,cdd22,precip_mm,rad_solar,humidade_relativa,nebulosidade_media,sunshine_sec,sunshine_h,wind_speed_max,wind_gusts_max,day_length_hours
0,2015-01-01,119.0,9.1,14.7,4.9,8.9,0.0,0.0,9.68,79,0,30286.11,8.412808,11.9,19.8,9.43
1,2015-01-01,119.0,8.5,13.3,4.7,9.5,0.0,0.0,10.04,77,0,30701.15,8.528097,9.8,16.9,9.53
2,2015-01-01,119.0,5.1,14.8,-0.6,12.9,0.0,0.0,8.5,81,39,29536.07,8.204464,11.6,22.3,9.25
3,2015-01-01,119.0,4.5,14.0,-1.0,13.5,0.0,0.0,8.87,82,22,29503.68,8.195467,10.5,23.8,9.25
4,2015-01-01,119.0,4.6,14.2,-1.1,13.4,0.0,0.0,8.87,81,22,29536.07,8.204464,10.5,23.8,9.27


In [3]:
def add_lags(frame, cols, lags=(1,2,3,7)):
    f=frame.copy()
    for c in cols:
        if c in f.columns:
            for L in lags:
                f[f"{c}_lag{L}"]=f[c].shift(L)
    return f
if 'hdd18' not in df.columns and 'tmean_c' in df.columns:
    df['hdd18']=(18-df['tmean_c']).clip(lower=0)
if 'cdd22' not in df.columns and 'tmean_c' in df.columns:
    df['cdd22']=(df['tmean_c']-22).clip(lower=0)
exo_base=[c for c in ['tmean_c','hdd18','cdd22','rad_solar','humidade_relativa'] if c in df.columns]
df=add_lags(df, exo_base, lags=(1,2,3,7))
lag_cols=[c for c in df.columns if any(c.startswith(b+'_lag') for b in exo_base)]
work=df[['date',TARGET]+lag_cols].dropna().copy() if 'date' in df.columns else df[['data',TARGET]+lag_cols].dropna().copy()
work.to_csv(OUT_DIR/'model_table_lags.csv', index=False)
len(work)

77393

In [4]:
import pandas as pd
import numpy as np
from statsmodels.tsa.statespace.sarimax import SARIMAX
import matplotlib.pyplot as plt

y = work[TARGET].astype(float).values
X = work[lag_cols].astype(float).values if lag_cols else None
order=(1,1,1); seasonal_order=(1,0,1,7)

res = SARIMAX(
    y, exog=X, order=order, seasonal_order=seasonal_order,
    enforce_stationarity=False, enforce_invertibility=False
).fit(disp=False)

print(res.summary())


param_names = np.array(res.param_names, dtype=object)  
coef = np.array(res.params, dtype=float)               
pvals = np.array(res.pvalues, dtype=float)             

params = pd.DataFrame({"coef": coef, "pvalue": pvals}, index=param_names)
params.index.name = "param"
params.to_csv(OUT_DIR / 'arimax_params.csv')


pred = res.get_prediction() 
sf = pred.summary_frame()    


date_col = 'date' if 'date' in work.columns else ('data' if 'data' in work.columns else None)
if date_col is None:
    
    dates = pd.RangeIndex(start=0, stop=len(y), step=1)
else:
    dates = pd.to_datetime(work[date_col])

pred_df = pd.DataFrame({
    'date': dates,
    'y': y,
    'yhat': sf['mean'].to_numpy(),
    'lo': sf.get('mean_ci_lower', sf.iloc[:, 0]).to_numpy(),
    'hi': sf.get('mean_ci_upper', sf.iloc[:, 1]).to_numpy(),
})
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np

rmse = np.sqrt(mean_squared_error(pred_df['y'], pred_df['yhat']))
mae = mean_absolute_error(pred_df['y'], pred_df['yhat'])
r2  = r2_score(pred_df['y'], pred_df['yhat'])

metrics = pd.DataFrame({
    'metric': ['RMSE','MAE','R²'],
    'value': [rmse, mae, r2]
})
metrics.to_csv(OUT_DIR / 'arimax_metrics.csv', index=False)
print(metrics)


plt.figure(figsize=(10, 4))
plt.plot(pred_df['date'], pred_df['y'], label='obs')
plt.plot(pred_df['date'], pred_df['yhat'], label='fit')
plt.fill_between(pred_df['date'], pred_df['lo'], pred_df['hi'], alpha=0.2, label='95% CI')
plt.legend()
plt.title('ARIMAX in-sample')
plt.tight_layout()
plt.savefig(OUT_DIR / 'arimax_fit_ci.png', dpi=150)
plt.close()


                                     SARIMAX Results                                     
Dep. Variable:                                 y   No. Observations:                77393
Model:             SARIMAX(1, 1, 1)x(1, 0, 1, 7)   Log Likelihood             -188211.118
Date:                           Mon, 03 Nov 2025   AIC                         376472.236
Time:                                   11:55:14   BIC                         376703.649
Sample:                                        0   HQIC                        376543.283
                                         - 77393                                         
Covariance Type:                             opg                                         
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
x1            -0.0015      0.017     -0.090      0.928      -0.034       0.031
x2            -0.0143      0.020     -0.729

In [5]:

from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from functools import reduce
from pygam import LinearGAM, s, l  


TARGET = 'consumo_gwh' 



tcol = 'date' if 'date' in df.columns else 'data'
gam_cols = [c for c in ['tmean_c','hdd18','cdd22','rad_solar','humidade_relativa'] if c in df.columns]

gdf = df[[tcol, TARGET] + gam_cols].copy()

gdf[tcol] = pd.to_datetime(gdf[tcol], errors='coerce')
gdf = gdf.dropna(subset=[tcol, TARGET])


gdf['doy'] = gdf[tcol].dt.dayofyear.astype(float)
gdf['doy01'] = (gdf['doy'] - 1) / 365.25  # ~[0,1)

H = 3  
for h in range(1, H+1):
    gdf[f'sin{h}'] = np.sin(2 * np.pi * h * gdf['doy01'])
    gdf[f'cos{h}'] = np.cos(2 * np.pi * h * gdf['doy01'])

fourier_cols = [f'sin{h}' for h in range(1, H+1)] + [f'cos{h}' for h in range(1, H+1)]


feat_cols = gam_cols + fourier_cols
gdf = gdf.dropna(subset=feat_cols) 
X_gam = gdf[feat_cols].astype(float).values
y_gam = gdf[TARGET].astype(float).values


terms = [s(i) for i in range(len(gam_cols))] + \
        [l(i) for i in range(len(gam_cols), len(feat_cols))]


if not terms:
    raise ValueError("Nenhum termo foi definido. Verifique se há colunas em gam_cols ou defina H>0.")
term_list = reduce(lambda a, b: a + b, terms)


assert X_gam.shape[1] == len(terms), f"Nº colunas X ({X_gam.shape[1]}) != nº termos ({len(terms)})"
if not (np.isfinite(X_gam).all() and np.isfinite(y_gam).all()):
    raise ValueError("Existem NaN/Inf em X_gam ou y_gam. Limpe os dados antes do ajuste.")


gam = LinearGAM(term_list).fit(X_gam, y_gam)


OUT_DIR.mkdir(parents=True, exist_ok=True)
pd.DataFrame([{
    'pseudo_r2': gam.statistics_['pseudo_r2'],
    'edof': gam.statistics_['edof'],
    'gcv': gam.statistics_['GCV']
}]).to_csv(OUT_DIR / 'gam_summary.csv', index=False)


for i, name in enumerate(feat_cols):
    XX = gam.generate_X_grid(term=i)
    plt.figure(figsize=(5, 3))
    plt.plot(XX[:, i], gam.partial_dependence(term=i, X=XX))
    plt.title(f'Shape: {name}')
    plt.tight_layout()
    plt.savefig(OUT_DIR / f'gam_shape_{name}.png', dpi=150)
    plt.close()


yhat_gam = gam.predict(X_gam)
gam_pred_df = pd.DataFrame({
    'date': gdf[tcol].values,
    'y': y_gam,
    'yhat': yhat_gam,
    'resid': y_gam - yhat_gam
})
gam_pred_df.to_csv(OUT_DIR / 'gam_in_sample_pred.csv', index=False)

print("sucess.")
print(f"Saved at: {OUT_DIR.resolve()}")


GAM treinado com sucesso.
Arquivos salvos em: C:\Users\acer\Desktop\fnn-pso\Mestrado Pedro\outputs_knowledge


In [6]:

import numpy as np, pandas as pd
from pathlib import Path

def _read_csv_robust(path):
    trials = [(e,s) for e in ("utf-8","latin1") for s in (",",";","\t")]
    last=None
    for enc,sep in trials:
        try:
            df=pd.read_csv(path,encoding=enc,sep=sep)
            if df.shape[1]>=6 and df.shape[0]>=100:
                print(f"[info] Parsed CSV enc={enc} sep='{sep}' shape={df.shape}")
                return df
        except Exception as e:
            last=e
    raise RuntimeError(f"Could not parse CSV {path}. Last error: {last}")


try:
    df
except NameError:
    CSV_PATH = "dataset_meteo_com_consumo.csv"  
    df = _read_csv_robust(CSV_PATH)


df.columns = [c.strip().lower().replace(" ","_") for c in df.columns]


TARGET = None
for cand in ["consumo_gwh","consumo","target","gwh","consumo_diario_gwh"]:
    if cand in df.columns:
        TARGET = cand
        break
assert TARGET is not None, "Dont found(TARGET)."


import re
def compute_support(rule_series: pd.Series, X: pd.DataFrame) -> pd.Series:
    """Percentage of lines that satisfy each rule (rules with ‘&’ and simple comparators)."""
    sup = []
    for rule in rule_series.astype(str):
        mask = np.ones(len(X), dtype=bool)
        for part in re.split(r"\s*&\s*", rule):
            m = re.match(r"\s*([\w_]+)\s*(<=|>=|<|>)\s*([-+]?\d+(\.\d+)?)\s*", part)
            if m:
                feat, op, thr = m.group(1), m.group(2), float(m.group(3))
                if feat not in X.columns:
                    mask &= False
                else:
                    s = pd.to_numeric(X[feat], errors="coerce")
                    if op == "<":   mask &= (s < thr)
                    elif op == "<=": mask &= (s <= thr)
                    elif op == ">":  mask &= (s > thr)
                    elif op == ">=": mask &= (s >= thr)
            else:
                m2 = re.match(r"\s*([\w_]+)\s*==\s*([01])\s*", part)
                if m2:
                    feat, val = m2.group(1), int(m2.group(2))
                    if feat not in X.columns:
                        mask &= False
                    else:
                        mask &= (pd.to_numeric(X[feat], errors="coerce") == val)
        sup.append(mask.mean())
    return pd.Series(sup)


rules_out = Path("outputs_knowledge") / "rulefit_rules.csv"
rules_out.parent.mkdir(exist_ok=True, parents=True)

ok = False; impl = None
try:
    from rulefit import RuleFit            
    from sklearn.ensemble import RandomForestRegressor
    ok = True; impl = "rulefit"
except Exception:
    try:
        from imodels import RuleFitRegressor as RuleFit   
        from sklearn.ensemble import RandomForestRegressor
        ok = True; impl = "imodels"
    except Exception:
        pass

if ok:
 
    rcols = [c for c in ['tmean_c','hdd18','cdd22','rad_solar','humidade_relativa']
             if c in df.columns]
    assert rcols, "Nenhuma feature meteo base encontrada em df."
    rdf = df[[TARGET] + rcols].dropna().copy()
    Xr  = rdf[rcols].astype(float)
    yr  = rdf[TARGET].astype(float).values

   
    MAX_SAMPLES = 10000
    if len(Xr) > MAX_SAMPLES:
        idx = np.random.RandomState(0).choice(len(Xr), size=MAX_SAMPLES, replace=False)
        Xfit = Xr.iloc[idx].copy(); yfit = yr[idx].copy()
    else:
        Xfit = Xr; yfit = yr

   
    tree_gen = RandomForestRegressor(
        n_estimators=140,     
        max_depth=4,         
        min_samples_leaf=50, 
        n_jobs=-1,
        random_state=0
    )

   
    rf_kwargs = dict(
        tree_generator=tree_gen,
        tree_size=4,
        max_rules=500,
        sample_fract=0.5,
        random_state=0
    )
    
    rf = RuleFit(**{k:v for k,v in rf_kwargs.items()
                    if k in RuleFit.__init__.__code__.co_varnames})
    rf.fit(Xfit.values, yfit, feature_names=rcols)

    
    try:
        rules = rf.get_rules()
        
        coef_col = 'coef' if 'coef' in rules.columns else ('coefficient' if 'coefficient' in rules.columns else None)

        
        if 'type' in rules.columns:
            rules = rules[rules['type'] != 'linear']
        if coef_col is not None:
            rules = rules[rules[coef_col] != 0]

      
        if 'support' not in rules.columns:
            rules['support'] = compute_support(rules['rule'], Xr)

        
        rules['len'] = rules['rule'].str.count('&') + 1
        rules = rules[(rules['len'] <= 3) & (rules['support'] >= 0.02)]

        
        if coef_col is not None:
            rules['score'] = rules[coef_col].abs() * rules['support']
            rules = rules.sort_values('score', ascending=False)

        
        cols_to_save = ['rule','support','len'] + ([coef_col] if coef_col else [])
        rules[cols_to_save].to_csv(rules_out, index=False)
        print(f"[ok] Rules saved at: {rules_out}")
    except Exception as e:
        print("[warn] get_rules failed, saving feature importance.", e)
        imp = getattr(rf, 'feature_importances_', None)
        if imp is not None:
            pd.DataFrame({'feature': rcols, 'importance': imp}).to_csv(rules_out, index=False)
            print(f"[ok] Amounts saved in: {rules_out}")
else:
    rules_out.write_text("# RuleFit not available; install 'rulefit' or 'imodels'.\n", encoding="utf-8")
    print("[erro] Neither ‘rulefit’ nor ‘imodels’ available.")


[ok] Regras salvas em: outputs_knowledge\rulefit_rules.csv


In [7]:

from pathlib import Path
import numpy as np, pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import STL
import ruptures as rpt


PERIOD = 7                 
USE_WEEKLY = True          
MAX_POINTS = 20000         
RUPTURES_MODEL = "l2"      
RUPTURES_JUMP = 7          
RUPTURES_MIN_SIZE = 7      
PENALTY = 10               
N_BKPS = None             


try:
    OUT_DIR
except NameError:
    OUT_DIR = Path("outputs_knowledge")
OUT_DIR.mkdir(parents=True, exist_ok=True)


tcol = "date" if "date" in df.columns else ("data" if "data" in df.columns else None)
assert tcol is not None, "Coluna de data não encontrada (date/data)."

tmp = df[[tcol, TARGET]].copy()
tmp[tcol] = pd.to_datetime(tmp[tcol], errors="coerce")
tmp = tmp.dropna(subset=[tcol, TARGET])


ts = (tmp.groupby(tmp[tcol].dt.date)[TARGET]
         .mean()                               
         .astype("float32")
         .sort_index())




if USE_WEEKLY and len(ts) > MAX_POINTS:
    ts = (ts.to_frame("y")
            .set_index(pd.to_datetime(ts.index))
            .resample("W")
            .mean()["y"]
            .astype("float32"))
    PERIOD = 52  

print(f"[info] Série para STL: n={len(ts)} pontos, periodicidade={PERIOD}")


stl = STL(ts, period=PERIOD, robust=True)
res = stl.fit()


res.trend.astype("float32").to_frame("trend").to_csv(OUT_DIR/"stl_trend.csv")
res.seasonal.astype("float32").to_frame("seasonal").to_csv(OUT_DIR/"stl_seasonal.csv")
res.resid.astype("float32").to_frame("resid").to_csv(OUT_DIR/"stl_resid.csv")


plt.figure(figsize=(10, 6))
plt.subplot(311); plt.plot(ts.index, ts.values); plt.title("Observed")
plt.subplot(312); plt.plot(ts.index, res.trend.values); plt.title("Trend")
plt.subplot(313); plt.plot(ts.index, res.seasonal.values); plt.title("Seasonal")
plt.tight_layout(); plt.savefig(OUT_DIR/"stl_components.png", dpi=150); plt.close()


signal = np.asarray(res.resid.dropna().values, dtype=np.float32)

algo = rpt.Pelt(model=RUPTURES_MODEL, min_size=RUPTURES_MIN_SIZE, jump=RUPTURES_JUMP).fit(signal)

if N_BKPS is None:
    bkps = algo.predict(pen=PENALTY)
else:
    bkps = algo.predict(n_bkps=N_BKPS)

pd.DataFrame({"break_index": bkps}).to_csv(OUT_DIR/"ruptures_breakpoints.csv", index=False)


plt.figure(figsize=(10, 4))
plt.plot(res.trend.index, res.trend.values, label="Trend", linewidth=1.3)
for i, b in enumerate(bkps[:-1]): 
    if 0 < b < len(res.trend):
        plt.axvline(res.trend.index[b], linestyle="--", alpha=0.4)
plt.title("Trend with detected change-points")
plt.tight_layout(); plt.savefig(OUT_DIR/"trend_with_bkps.png", dpi=150); plt.close()

print(f"[ok] STL and ruptures finalized. bkps={len(bkps)-1} segmentos → {OUT_DIR.resolve()}")


[info] Série para STL: n=1548 pontos, periodicidade=7
[ok] STL e ruptures finalizados. bkps=190 segmentos → C:\Users\acer\Desktop\fnn-pso\Mestrado Pedro\outputs_knowledge
