In [None]:
# Fuzzy Decision Tree Classifier for RECS 2020 Dataset
# ==================================================
# This script implements a custom Fuzzy Decision Tree classifier using scikit-fuzzy,
# predicts energy efficiency classes, and integrates fuzzy logic scores if available.
#
# Inputs:
# - Processed dataset (data/processed/merged_with_efficiency.csv or merged_cleaned.csv)
# Outputs:
# - Trained model, evaluation metrics, visualizations
#
# Dependencies: pandas, numpy, scikit-fuzzy, scikit-learn, seaborn, matplotlib

In [None]:
import pandas as pd
import numpy as np
import os
import skfuzzy as fuzz
from sklearn.tree import DecisionTreeClassifier, export_text
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Setup paths
BASE_DIR = os.path.abspath(os.path.join(os.getcwd(), ".."))
DATA_DIR = os.path.join(BASE_DIR, "data")
PROCESSED_DIR = os.path.join(DATA_DIR, "processed")

In [None]:
# Define Fuzzy Decision Tree class
class FuzzyDecisionTree(BaseEstimator, ClassifierMixin):
    def __init__(self, **tree_params):
        self.tree = DecisionTreeClassifier(**tree_params)
        self.features = [
            'energy_low', 'energy_medium', 'energy_high',
            'income_low', 'income_medium', 'income_high',
            'housing_single_family', 'housing_apt',
            'climate_cold', 'climate_hot_humid', 'climate_mixed_humid',
            'home_old', 'ac_old', 'heater_old', 'water_heater_old'
        ]
    
    def fuzz_energy(self, val, min_val, mean_val, max_val):
        x = np.linspace(min_val, max_val, 100)
        low = fuzz.trimf(x, [min_val, min_val, mean_val])
        medium = fuzz.trimf(x, [min_val, mean_val, max_val])
        high = fuzz.trimf(x, [min_val, max_val, max_val])
        return {
            'low': fuzz.interp_membership(x, low, val),
            'medium': fuzz.interp_membership(x, medium, val),
            'high': fuzz.interp_membership(x, high, val)
        }
    
    def fuzz_income(self, val, min_val, mean_val, max_val):
        x = np.linspace(min_val, max_val, 100)
        low = fuzz.trimf(x, [min_val, min_val, mean_val])
        medium = fuzz.trimf(x, [min_val, mean_val, max_val])
        high = fuzz.trimf(x, [min_val, max_val, max_val])
        return {
            'low': fuzz.interp_membership(x, low, val),
            'medium': fuzz.interp_membership(x, medium, val),
            'high': fuzz.interp_membership(x, high, val)
        }
    
    def fuzzify_row(self, row, energy_params, income_params):
        energy_fuzzy = self.fuzz_energy(row['ENERGY_CONSUMPTION_PER_SQFT'], *energy_params)
        income_fuzzy = self.fuzz_income(row['Pct_INCOME_MORE_THAN_150K'], *income_params)
        return {
            'energy_low': energy_fuzzy['low'],
            'energy_medium': energy_fuzzy['medium'],
            'energy_high': energy_fuzzy['high'],
            'income_low': income_fuzzy['low'],
            'income_medium': income_fuzzy['medium'],
            'income_high': income_fuzzy['high'],
            'housing_single_family': row.get('Pct_HOUSING_SINGLE_FAMILY_HOME_DETACHED', 0) / 100,
            'housing_apt': row.get('Pct_HOUSING_APT_MORE_THAN_5_UNITS', 0) / 100,
            'climate_cold': row.get('CLIMATE_Cold', 0),
            'climate_hot_humid': row.get('CLIMATE_Hot-Humid', 0),
            'climate_mixed_humid': row.get('CLIMATE_Mixed-Humid', 0),
            'home_old': 1 if row.get('Pct_BUILT_BEFORE_1950', 0) > 50 else 0,
            'ac_old': 1 if row.get('Pct_MAIN_AC_AGE_OLDER_THAN_20', 0) > 50 else 0,
            'heater_old': 1 if row.get('Pct_MAIN_HEAT_AGE_OLDER_THAN_20', 0) > 50 else 0,
            'water_heater_old': 1 if row.get('Pct_MAIN_WATER_HEAT_AGE_OLDER_THAN_20', 0) > 50 else 0
        }
    
    def fuzzify(self, X, energy_params, income_params):
        required_cols = ['ENERGY_CONSUMPTION_PER_SQFT', 'Pct_INCOME_MORE_THAN_150K']
        missing_cols = [col for col in required_cols if col not in X.columns]
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")
        
        fuzzy_df = X.apply(lambda row: self.fuzzify_row(row, energy_params, income_params),
                          axis=1, result_type='expand')
        return fuzzy_df[self.features]
    
    def fit(self, X, y, energy_params=None, income_params=None):
        if energy_params is None:
            energy_params = (
                X['ENERGY_CONSUMPTION_PER_SQFT'].min(),
                X['ENERGY_CONSUMPTION_PER_SQFT'].mean(),
                X['ENERGY_CONSUMPTION_PER_SQFT'].max()
            )
        if income_params is None:
            income_params = (
                X['Pct_INCOME_MORE_THAN_150K'].min(),
                X['Pct_INCOME_MORE_THAN_150K'].mean(),
                X['Pct_INCOME_MORE_THAN_150K'].max()
            )
        
        self.energy_params_ = energy_params
        self.income_params_ = income_params
        X_fuzzy = self.fuzzify(X, energy_params, income_params)
        self.tree.fit(X_fuzzy, y)
        return self
    
    def predict(self, X):
        X_fuzzy = self.fuzzify(X, self.energy_params_, self.income_params_)
        return self.tree.predict(X_fuzzy)
    
    def rules(self):
        return export_text(self.tree, feature_names=self.features)

In [None]:
# Load data
input_path = os.path.join(PROCESSED_DIR, "merged_with_efficiency.csv")
if not os.path.exists(input_path):
    print(f"{input_path} not found, trying merged_cleaned.csv...")
    input_path = os.path.join(PROCESSED_DIR, "merged_cleaned.csv")
    if not os.path.exists(input_path):
        raise FileNotFoundError(f"Input file {input_path} not found. Run fuzzy_logic.ipynb or preprocessing.ipynb first.")
df = pd.read_csv(input_path)

# Print loaded columns for debugging
print("Loaded columns:", df.columns.tolist())

# Debug: Print FUZZY_OUTPUT sample if present
if 'FUZZY_OUTPUT' in df.columns:
    print("Sample FUZZY_OUTPUT:", df['FUZZY_OUTPUT'].head().tolist())

# Define features
features = [
    'ENERGY_CONSUMPTION_PER_SQFT', 'Pct_INCOME_MORE_THAN_150K',
    'CLIMATE_Cold', 'CLIMATE_Hot-Humid', 'CLIMATE_Mixed-Humid'
]
available_features = [col for col in features if col in df.columns]

# Check for Efficiency_Class and compute if missing
if 'Efficiency_Class' not in df.columns:
    print("Efficiency_Class missing, computing from ENERGY_CONSUMPTION_PER_SQFT...")
    df['Efficiency_Class'] = df['ENERGY_CONSUMPTION_PER_SQFT'].apply(
        lambda x: "High" if x < df['ENERGY_CONSUMPTION_PER_SQFT'].quantile(0.33) else
                 ("Moderate" if x <= df['ENERGY_CONSUMPTION_PER_SQFT'].quantile(0.66) else "Low")
    )

# Prepare data
X = df[available_features]
y = df['Efficiency_Class']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train fuzzy decision tree
fdt = FuzzyDecisionTree(max_depth=4, random_state=42)
fdt.fit(X_train, y_train)

# Evaluate model
y_pred = fdt.predict(X_test)
print("Fuzzy Decision Tree Accuracy:", accuracy_score(y_test, y_pred))
print("\nClassification Report:\n", classification_report(y_test, y_pred))

# Evaluate fuzzy-refined classes if available
if 'FINAL_CLASS' in df.columns:
    y_fuzzy = df.loc[X_test.index, 'FINAL_CLASS']
    print("\nFuzzy-Refined Accuracy:", accuracy_score(y_test, y_fuzzy))
    print("\nFuzzy-Refined Classification Report:\n", classification_report(y_test, y_fuzzy))
else:
    print("FINAL_CLASS not found, skipping fuzzy-refined evaluation.")

# Confusion matrix visualization
conf_matrix = confusion_matrix(y_test, y_pred, labels=fdt.tree.classes_)
plt.figure(figsize=(6, 4))
sns.heatmap(conf_matrix, annot=True, fmt='d', xticklabels=fdt.tree.classes_,
            yticklabels=fdt.tree.classes_, cmap="Blues")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Fuzzy Decision Tree Confusion Matrix")
plt.show()

# Cross-validation
cv_scores = cross_val_score(fdt, X, y, cv=5, scoring='accuracy')
print("Cross-Validation Scores:", cv_scores)
print("Average CV Accuracy:", cv_scores.mean())

# Print rules
print("\nFuzzy Decision Tree Rules:\n", fdt.rules())