In [None]:
!pip install doubleml

In [2]:
import sys
import doubleml

print(f"Python version: {sys.version}")
print(f"DoubleML version: {doubleml.__version__}")

Python version: 3.12.11 (main, Jun  4 2025, 08:56:18) [GCC 11.4.0]
DoubleML version: 0.10.1


In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import patsy
from doubleml import DoubleMLData, DoubleMLPLR
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LassoCV
from xgboost import XGBRegressor
import warnings
from typing import List, Dict

warnings.filterwarnings('ignore')

**Data preparation**

In [None]:
data = pd.read_csv("global_combined_HDD.csv")

data['wealth'] = np.where((data['wealth'] == -1) & (data['income'] != -1),
                         data['income'], data['wealth'])
data = data[data['wealth'] != -1]
data = data.drop(columns=["income", "total_HDD", "mean_temp", "threshold"])

data = data.sort_values(by=['GDLcode', 'time'])
data['carbon_emission_lag1'] = data.groupby('GDLcode')['carbon emission'].shift(1)

data = data.dropna()
data = data[data['HDD_days'] != 0]

#Select research areas
countries = ["AUT", "BEL", "BGR", "HRV", "CYP", "CZE", "DNK", "EST", "FIN", "FRA", "DEU", "GRC", "HUN", "IRL", "ITA", "LVA", "LTU", "LUX", "MLT", "NLD", "POL", "PRT", "ROU", "SVK", "SVN", "ESP", "SWE"]
#countries = ["GBR"]
#countries = ["CAN"]
#countries = ["USA"]
#countries = ["CHN"]
#countries = ["RUS"]

data = data[data['GDLcode'].str.contains('|'.join(countries), na=False)]

**Data cleansing**

In [None]:
df = data.copy()

Q1 = df['carbon emission'].quantile(0.25)
Q3 = df['carbon emission'].quantile(0.75)
IQR = Q3 - Q1

lower_bound = Q1 - 2 * IQR
upper_bound = Q3 + 2 * IQR

df['outlier'] = (df['carbon emission'] < lower_bound) | (df['carbon emission'] > upper_bound)

# delect outlier
data = df[~df['outlier']].drop(columns='outlier')

In [None]:
variables = ["HDD_days", "carbon_emission_lag1", "temperature", "radiation", "precipitation","surface temp", "windspeed",
             "GDP", "population", "age","education", "gender", "Hhsize", "wealth", "rural"]

df = data[variables + ['carbon emission']].copy()

df_long = pd.melt(df, id_vars='carbon emission', value_vars=variables,
                  var_name='Variable', value_name='Value')

df_long['VarGroup'] = df_long.groupby('Variable')['Value'] \
                             .transform(lambda x: pd.qcut(x, 3, labels=False, duplicates='drop') + 1)

ncol = 5
nrow = int(np.ceil(len(variables) / ncol))
fig, axes = plt.subplots(nrow, ncol, figsize=(ncol * 2, nrow * 2), sharey=True)
axes = axes.flatten()

gray_colors = plt.cm.Greys(np.linspace(0, 0.5, 3))

# plot
for i, var in enumerate(variables):
    ax = axes[i]
    subset = df_long[df_long['Variable'] == var]
    groups = [group["carbon emission"].values for name, group in subset.groupby('VarGroup')]

    box = ax.boxplot(groups, patch_artist=True, showfliers=True,
                     flierprops=dict(marker='o', markerfacecolor='black', markersize=2, linestyle='none'))

    for patch, color in zip(box['boxes'], gray_colors):
        patch.set_facecolor(color)
        patch.set_edgecolor('black')
        patch.set_alpha(0.9)

    for element in ['whiskers', 'caps', 'medians']:
        for line in box[element]:
            line.set_color('black')
            line.set_alpha(0.9)

    ax.set_title(var, fontweight='bold', fontsize=11)
    ax.set_xticklabels([str(i) for i in range(1, len(groups)+1)], fontsize=9)
    ax.tick_params(axis='y', labelsize=9)
    if i % ncol == 0:
        ax.set_ylabel('Carbon Emission', fontsize=10)

for j in range(len(variables), len(axes)):
    fig.delaxes(axes[j])

plt.tight_layout(rect=[0, 0, 1, 1])
plt.show()

**DML model**

In [None]:
def get_param_grids():
    lasso_params = {
        'alphas': [np.logspace(-5, 0, 5, 10)],
        'max_iter': [100, 500, 800, 1000],
        'cv': [5]
    }

    rf_params = {
        'n_estimators': [100, 250, 500],
        'max_depth': [3, 5, 10],
        'min_samples_split': [2, 6, 10]
    }

    xgb_params = {
        'n_estimators': [100, 250, 500],
        'max_depth': [3, 5, 10],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.3, 0.5, 1.0]
    }

    return {'ml_l': xgb_params, 'ml_m': rf_params}, {'ml_l': lasso_params, 'ml_m': rf_params}

# DML
class DoubleMLAnalysis:
    def __init__(self, data: pd.DataFrame, X_cols: List[str], y_col: str, d_cols: str):
        self.data = data
        self.X_cols = X_cols
        self.y_col = y_col
        self.d_cols = d_cols
        self.results_main = pd.DataFrame()
        self.results_robust = pd.DataFrame()
        self.cate_results = []

    def _prepare_spline_basis(self, var_name="wealth", df=5, degree=2):
        formula = f"bs({var_name}, df={df}, degree={degree})"
        design_matrix = patsy.dmatrix(formula, {var_name: self.data[var_name]})
        return pd.DataFrame(design_matrix)

    def _prepare_spline_grid(self, var_name="wealth", n_points=100, design_info=None):
        new_data = {var_name: np.linspace(self.data[var_name].min(),
                    self.data[var_name].max(), n_points)}
        return pd.DataFrame(patsy.build_design_matrices([design_info], new_data)[0])

    def _process_cate(self, cate, design_info, var_name="wealth"):
        spline_grid = self._prepare_spline_grid(var_name=var_name, design_info=design_info)
        df_cate = cate.confint(spline_grid, level=0.95, joint=True, n_rep_boot=2000)
        df_cate[var_name] = np.linspace(self.data[var_name].min(),
                                       self.data[var_name].max(),
                                       len(df_cate))
        return df_cate

    def _plot_cates(self, cate_results: List[Dict], var_name="wealth"):
        plt.figure(figsize=(5, 3))
        base_color = "#ace4e4" #colors = {'EU': '#e8e8e8','UK': '#5ac8c8','Canada': '#a5add3','US': '#be64ac','China': '#3b4994','Russia': '#ace4e4'}

        for i, res in enumerate(cate_results):
          df = res['cate_df']
          label = f"{res['model_type']} (Run {res['run']})"
          line = plt.plot(df[var_name], df['effect'],
                        color=base_color,
                        alpha=1,
                        linewidth=2,
                        label=label)

          plt.fill_between(df[var_name],
                          df['2.5 %'],
                          df['97.5 %'],
                          color=base_color,
                          alpha=0.05)

        plt.title("Russia")
        plt.xlabel(var_name.capitalize())
        plt.ylabel('Treatment Effect on Carbon Emission')
        plt.grid(True, linestyle='--', alpha=0.5)
        plt.show()

    def run_analysis(self, num_runs):
        main_param_grids, robust_param_grids = get_param_grids()

        for i in range(1, num_runs + 1):
            np.random.seed(i)
            dml_data = DoubleMLData(self.data, self.y_col, self.d_cols, self.X_cols)

            # main (XGBoost + RF)
            try:
                model_main = DoubleMLPLR(dml_data, XGBRegressor(), RandomForestRegressor())
                model_main.tune(main_param_grids, tune_on_folds=False, n_folds_tune=5)
                model_main.fit(n_jobs_cv=8)

                s = model_main.summary
                if not s.empty:
                    self.results_main = pd.concat([
                        self.results_main,
                        self._process_results(s, i, "Main")
                    ], ignore_index=True)

                # CATE
                design_matrix = patsy.dmatrix("bs(wealth, df=5, degree=2)",
                                             {"wealth": self.data["wealth"]})
                cate = model_main.cate(pd.DataFrame(design_matrix))
                df_cate = self._process_cate(cate, design_matrix.design_info)
                self.cate_results.append({
                    'run': i,
                    'model_type': 'XGBoost+RF',
                    'cate_df': df_cate,
                    'model': model_main
                })

            except Exception as e:
                print(f"Main Run {i} failed: {str(e)}")

            # robust (LASSO + RF)
            try:
                model_robust = DoubleMLPLR(dml_data, LassoCV(), RandomForestRegressor())
                model_robust.tune(robust_param_grids, tune_on_folds=False, n_folds_tune=5)
                model_robust.fit(n_jobs_cv=8)

                s = model_robust.summary
                if not s.empty:
                    self.results_robust = pd.concat([
                        self.results_robust,
                        self._process_results(s, i, "Robust")
                    ], ignore_index=True)

                cate = model_robust.cate(pd.DataFrame(design_matrix))  # 使用相同的design_matrix
                df_cate = self._process_cate(cate, design_matrix.design_info)
                self.cate_results.append({
                    'run': i,
                    'model_type': 'LASSO+RF',
                    'cate_df': df_cate,
                    'model': model_robust
                })

            except Exception as e:
                print(f"Robustness Run {i} failed: {str(e)}")

        # plot
        self._plot_cates(self.cate_results)

        return self

    def _process_results(self, summary_df, run_id, model_type):
        df = summary_df.reset_index().rename(
            columns={'index': 'term', 'std err': 'std_err', 'P>|t|': 'p_value'})
        df['run'] = run_id
        df['model_type'] = model_type
        return df

    def get_summary_stats(self):
        def _create_summary(df):
            if not df.empty:
                summary = df.groupby(['model_type', 'term']).agg({
                    'coef': ['mean', 'std'],
                    'std_err': 'mean',
                    't': 'mean',
                    'p_value': 'mean'
                })
                summary.columns = ['_'.join(col).strip() for col in summary.columns.values]
                return summary
            return pd.DataFrame()

        return {
            'main': _create_summary(self.results_main),
            'robust': _create_summary(self.results_robust)
        }

if __name__ == "__main__":
    analysis = DoubleMLAnalysis(
        data=data,
        X_cols=["carbon_emission_lag1", "temperature", "radiation", "precipitation",
                "surface temp", "windspeed", "GDP", "population", "age",
                "education", "gender", "Hhsize", "wealth", "rural"],
        y_col="carbon emission",
        d_cols="HDD_days"
    )

    # run 5 times
    analysis.run_analysis(num_runs=5)

    # summarize
    summaries = analysis.get_summary_stats()

    print("\n=== Main Analysis Summary (XGBoost + RF) ===")
    print(summaries['main'].round(4))

    print("\n=== Robustness Check Summary (LASSO + RF)  ===")
    print(summaries['robust'].round(4))

    print("\nSignif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1")

**Heterogeneity analysis**

In [None]:
if __name__ == "__main__":

    data['wealth_group'] = pd.qcut(data['wealth'], q=3, labels=["Low", "Medium", "High"])

    all_summaries = {}

    for group in ["Low", "Medium", "High"]:
        print(f"\n{'='*40}\nRunning analysis for Wealth Group: {group}\n{'='*40}")

        group_data = data[data['wealth_group'] == group].copy()

        analysis = DoubleMLAnalysis(
            data=group_data,
            X_cols=["carbon_emission_lag1", "temperature", "radiation", "precipitation",
                    "surface temp", "windspeed", "GDP", "population", "age",
                    "education", "gender", "Hhsize", "rural"],  # 移除了wealth_group列避免多重共线性)
            y_col="carbon emission",
            d_cols="HDD_days"
        )

        analysis.run_analysis(num_runs=5)

        summaries = analysis.get_summary_stats()
        all_summaries[group] = summaries

        print(f"\n=== {group} Wealth - Main Analysis (XGBoost+RF)  ===")
        print(summaries['main'].round(4))

        print(f"\n=== {group} Wealth - Robustness Check (LASSO + RF)  ===")
        print(summaries['robust'].round(4))

    print("\nSignif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1")