## Import libraries

In [1]:
import pandas as pd
import numpy as np
import os
import glob

## Read all required files (ffilel price + ffiled macro)

In [2]:
# Carbon data paths - Using standard forward_filled files (trading-only by default)
path_HBEA = "../../02_Data_Processed/01_Carbon_Markets/01_Regional/HBEA_forward_filled.parquet"
path_GDEA = "../../02_Data_Processed/01_Carbon_Markets/01_Regional/GDEA_forward_filled.parquet"

macro_dir_hubei = "../../02_Data_Processed/02_Macroeconomic_Indicators/03_Forward_Filled_Daily/hubei/"
macro_dir_guangdong = "../../02_Data_Processed/02_Macroeconomic_Indicators/03_Forward_Filled_Daily/guangdong/"
macro_dir_national_global = "../../02_Data_Processed/02_Macroeconomic_Indicators/03_Forward_Filled_Daily/national_or_global/"

In [3]:
macro_paths_hubei = glob.glob(os.path.join(macro_dir_hubei, "*.parquet"))
macro_paths_guangdong = glob.glob(os.path.join(macro_dir_guangdong, "*.parquet"))
macro_paths_national_global = glob.glob(os.path.join(macro_dir_national_global, "*.parquet"))


In [4]:

hbea = pd.read_parquet(path_HBEA)
gdea = pd.read_parquet(path_GDEA)

## Detect frequency

In [5]:
def detect_frequency(df: pd.DataFrame):
    if 'date' in df.columns:
        df = df.set_index('date')
    df.index = pd.to_datetime(df.index)

    # Find where the value changes (i.e., not forward-filled)
    changes = df['value'].ne(df['value'].shift())
    change_dates = df.index[changes]
    # Calculate gaps
    gaps = (change_dates[1:] - change_dates[:-1]).days
    median_gap = np.median(gaps)
    if median_gap <= 1.5:
        return 'D'
    elif 25 < median_gap < 35:
        return 'M'
    elif 80 < median_gap < 100:
        return 'Q'
    else:
        return f'unknown (median gap: {median_gap})'

In [6]:
macro_groups = {
    "hubei": macro_paths_hubei,
    "guangdong": macro_paths_guangdong,
    "national_global": macro_paths_national_global,
}

dfs_by_group = {}

for group_name, file_list in macro_groups.items():
    group_dfs = {}
    for path in file_list:
        name = os.path.splitext(os.path.basename(path))[0]
        df = pd.read_parquet(path)
        df = df.set_index('date')
        group_dfs[name] = df
    dfs_by_group[group_name] = group_dfs

## Shift either 1 day (daily) or 15 day (monthely, quarterly) based on frequency


In [7]:
shift_map = {'D': 1, 'M': 15, 'Q': 15}

shifted_dfs_by_group = {}

for group, group_dfs in dfs_by_group.items():
    shifted_group = {}
    for name, df in group_dfs.items():
        freq = detect_frequency(df) 
        shift_n = shift_map.get(freq, 1)
        name_shifted = f"{name}_{shift_n}"
        # Fix: Use periods parameter for shift operation
        shifted_group[name_shifted] = df.shift(periods=shift_n)
    shifted_dfs_by_group[group] = shifted_group

In [8]:
for key in shifted_dfs_by_group.keys():
    for k in shifted_dfs_by_group[key].keys():
        print(k)

Hubei_ElectricityConsumption_Monthly_ffill_daily_15
Hubei_IndustrialAddedValue_RealPrices_AboveScaleIndustry_YoY_ffill_daily_15
Hubei_GDP_Cumulative_ffill_daily_15
Guangdong_GDP_Cumulative_ffill_daily_15
Guangdong_ElectricityConsumption_Monthly_ffill_daily_15
Guangdong_IndustrialAddedValue_RealPrices_AboveScaleIndustry_YoY_ffill_daily_15
China_Output_CrudeOilProcessing_Monthly_ffill_daily_15
FuturesSettle(Cont)_BrentCrude_ffill_daily_1
FuturesClose(Cont)_NYMEX_NatGas_ffill_daily_1
China_Output_CrudeSteel_Monthly_ffill_daily_15
China_ElectricityGeneration_ThermalPower_Monthly_ffill_daily_15
China_TotalSocialFinancing_Monthly_ffill_daily_15
FuturesSettle(Cont)_EUA_Futures_ffill_daily_1
China_CPI_YoY_ffill_daily_15
China_TotalElectricityConsumption_Monthly_ffill_daily_15
China_Output_RawCoal_Monthly_ffill_daily_15
China_GDP_CurrentPrices_Cumulative_ffill_daily_15
China_Output_Cement_Monthly_ffill_daily_15
China_ManufacturingPMI_ffill_daily_15
CFETS_SpotFX_USD_CNY_ffill_daily_1


## Final join

In [9]:
def prep_macro_for_join_index(df, new_col_name):
    if 'date' in df.columns:
        df = df.set_index('date')
    df.index = pd.to_datetime(df.index)
    df.index.name = 'date'
    return df[['value']].rename(columns={'value': new_col_name})

def join_macros_on_index(main_df, macro_groups, group_keys):
    result = main_df.copy()
    if 'date' in result.columns:
        result = result.set_index('date')
    result.index = pd.to_datetime(result.index)
    result.index.name = 'date'
    
    # Track column names to avoid duplicates
    existing_cols = set(result.columns)
    
    for group in group_keys:
        for col_name, macro_df in macro_groups[group].items():
            # Skip if column already exists
            if col_name in existing_cols:
                print(f"Skipping duplicate column: {col_name}")
                continue
                
            macro_ready = prep_macro_for_join_index(macro_df, col_name)
            result = result.join(macro_ready, how='left')
            existing_cols.add(col_name)
    
    return result

In [10]:
hbea_final = join_macros_on_index(hbea, shifted_dfs_by_group, ['hubei', 'national_global'])
gdea_final = join_macros_on_index(gdea, shifted_dfs_by_group, ['guangdong', 'national_global'])
hbea_final

Unnamed: 0_level_0,close,vwap,volume_tons,turnover_cny,cum_turnover_cny,is_open,is_quiet,has_trade,gap_days,post_weekend,...,China_ElectricityGeneration_ThermalPower_Monthly_ffill_daily_15,China_TotalSocialFinancing_Monthly_ffill_daily_15,FuturesSettle(Cont)_EUA_Futures_ffill_daily_1,China_CPI_YoY_ffill_daily_15,China_TotalElectricityConsumption_Monthly_ffill_daily_15,China_Output_RawCoal_Monthly_ffill_daily_15,China_GDP_CurrentPrices_Cumulative_ffill_daily_15,China_Output_Cement_Monthly_ffill_daily_15,China_ManufacturingPMI_ffill_daily_15,CFETS_SpotFX_USD_CNY_ffill_daily_1
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2014-04-28,24.43,24.49,57357.0,1404897.41,2051655.0,True,False,True,0,False,...,3754.8,20934.0,5.06,2.3848,45440000.0,30500.0,143539.4,20484.7,50.3,6.2536
2014-04-29,24.41,24.25,62994.0,1527794.05,2114649.0,True,False,True,1,False,...,3754.8,20934.0,5.20,2.3848,45440000.0,30500.0,143539.4,20484.7,50.3,6.2530
2014-04-30,24.38,24.36,83552.0,2034943.25,2198201.0,True,False,True,1,False,...,3754.8,20934.0,5.41,2.3848,45440000.0,30500.0,143539.4,20484.7,50.3,6.2580
2014-05-05,24.11,24.19,50770.0,1228302.70,2248971.0,True,False,True,5,False,...,3754.8,20934.0,5.18,2.3848,45440000.0,30500.0,143539.4,20484.7,50.3,6.2593
2014-05-06,24.10,23.83,42494.0,1012618.86,2291465.0,True,False,True,1,False,...,3754.8,20934.0,5.21,2.3848,45440000.0,30500.0,143539.4,20484.7,50.3,6.2455
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2025-04-25,37.37,37.37,768.0,28703.86,101208019.0,True,False,True,1,False,...,,,65.28,,,,,,,7.2931
2025-04-28,39.09,39.09,3.0,117.27,101208022.0,True,False,True,3,True,...,,,65.40,,,,,,,7.2832
2025-04-29,37.29,37.29,4307.0,160590.26,101212329.0,True,False,True,1,False,...,,,64.28,,,,,,,7.2995
2025-04-30,41.00,41.00,3000.0,123000.00,101215329.0,True,False,True,1,False,...,,,,,,,,,,


## Save final datasets

In [11]:
# Create output directory
output_dir = "../../02_Data_Processed/03_Feature_Engineered/"
os.makedirs(output_dir, exist_ok=True)
print(f"Output directory created: {output_dir}")

Output directory created: ../../02_Data_Processed/03_Feature_Engineered/


In [12]:
# Save the joined datasets
hbea_final.to_parquet(os.path.join(output_dir, "HBEA_daily_with_macro.parquet"))
gdea_final.to_parquet(os.path.join(output_dir, "GDEA_daily_with_macro.parquet"))

print(f"Saved HBEA data: {hbea_final.shape}")
print(f"Saved GDEA data: {gdea_final.shape}")

Saved HBEA data: (2679, 28)
Saved GDEA data: (2638, 28)


## Data validation

In [13]:
# Check for missing values
print("Missing values in HBEA:")
print(hbea_final.isnull().sum().sum())
print("\nMissing values in GDEA:")
print(gdea_final.isnull().sum().sum())

# Date range verification
print(f"\nHBEA Date Range: {hbea_final.index.min()} to {hbea_final.index.max()}")
print(f"GDEA Date Range: {gdea_final.index.min()} to {gdea_final.index.max()}")

# Column summary
print(f"\nTotal columns in final dataset: {len(hbea_final.columns)}")
carbon_cols = [col for col in hbea_final.columns if col in ['close_price', 'volume', 'is_open']]
macro_cols = [col for col in hbea_final.columns if col not in ['close_price', 'volume', 'is_open']]
print(f"Carbon columns: {carbon_cols}")
print(f"Number of macro columns: {len(macro_cols)}")

Missing values in HBEA:
294

Missing values in GDEA:
294

HBEA Date Range: 2014-04-28 00:00:00 to 2025-05-06 00:00:00
GDEA Date Range: 2014-06-27 00:00:00 to 2025-05-06 00:00:00

Total columns in final dataset: 28
Carbon columns: ['is_open']
Number of macro columns: 27


## Summary statistics

In [14]:
# Generate summary statistics for numeric columns
numeric_cols_hbea = hbea_final.select_dtypes(include=[np.number]).columns
numeric_cols_gdea = gdea_final.select_dtypes(include=[np.number]).columns

# Create summary statistics DataFrame
summary_stats = pd.DataFrame({
    'HBEA_mean': hbea_final[numeric_cols_hbea].mean(),
    'HBEA_std': hbea_final[numeric_cols_hbea].std(),
    'GDEA_mean': gdea_final[numeric_cols_gdea].mean(),
    'GDEA_std': gdea_final[numeric_cols_gdea].std()
})

# Save summary statistics
summary_stats.to_csv(os.path.join(output_dir, "summary_statistics.csv"))
print("Summary statistics saved.")
print("\nFirst few rows of summary statistics:")
print(summary_stats.head(10))

Summary statistics saved.

First few rows of summary statistics:
                                                       HBEA_mean  \
CFETS_SpotFX_USD_CNY_ffill_daily_1                  6.736871e+00   
China_CPI_YoY_ffill_daily_15                        1.599534e+00   
China_ElectricityGeneration_ThermalPower_Monthl...  4.396421e+03   
China_GDP_CurrentPrices_Cumulative_ffill_daily_15   5.957255e+05   
China_ManufacturingPMI_ffill_daily_15               5.020896e+01   
China_Output_Cement_Monthly_ffill_daily_15          2.016114e+04   
China_Output_CrudeOilProcessing_Monthly_ffill_d...  5.275009e+07   
China_Output_CrudeSteel_Monthly_ffill_daily_15      7.832252e+03   
China_Output_RawCoal_Monthly_ffill_daily_15         3.357125e+04   
China_TotalElectricityConsumption_Monthly_ffill...  6.160725e+07   

                                                        HBEA_std  \
CFETS_SpotFX_USD_CNY_ffill_daily_1                  3.458721e-01   
China_CPI_YoY_ffill_daily_15                      

## Output Description

This notebook creates two main datasets with carbon prices joined with lagged macroeconomic indicators:

### Output Files:
- `HBEA_daily_with_macro.parquet` - Hubei carbon prices with macro features
- `GDEA_daily_with_macro.parquet` - Guangdong carbon prices with macro features
- `summary_statistics.csv` - Summary statistics for all numeric columns

### Dataset Structure:
Each dataset contains:
- **Index**: `date` (daily frequency, includes both trading and non-trading days)
- **Carbon columns**: 
  - `close_price` - Daily closing price
  - `volume` - Trading volume
  - `is_open` - Whether the market was open (1) or closed (0)
- **Macro columns**: All indicators with appropriate lags applied
  - Format: `[indicator_name]_[lag_days]`
  - Example: `China_CPI_YoY_ffill_daily_15` (15-day lag for monthly data)
  - Example: `CFETS_SpotFX_USD_CNY_ffill_daily_1` (1-day lag for daily data)

### Lag Structure:
The lag structure ensures no look-ahead bias in trading strategies:
- **Daily indicators**: 1-day lag (e.g., FX rates, energy futures)
- **Monthly/Quarterly indicators**: 15-day lag (e.g., CPI, Industrial Value Added, GDP)

This means that on any given trading day, you only have access to information that would have been publicly available by that date.