# Domain Driven Features

The EPC data contains several categorical variables with a lot of values. In order to find suitable features which will retain the most information, three feature sets are explored; 
* data driven 
* domain driven 
* exhaustive 

The domain-driven approach groups categories together using expert domain-based knowledge. Examples of this are grouping the original fields such as rural, urban and suburban local authorities together, or pulling out the key features such as ‘pitched roof’ or ‘insulated floor’. It should be noted that although the domain-driven features are easier to understand, this approach will not be as powerful as the data-driven approach, as performance has been traded for interpretation.

This script groups the levels of the categorical fields and bins the numerical fields

In [1]:
import numpy as np
import pandas as pd
import datetime
import os
import glob
import json

In [3]:
# set variables from config file
config_path = os.path.abspath('..')[:-7]

with open(config_path + '/config.json', 'r') as f:
    config = json.load(f)

processing_path = config['DEFAULT']['processing_path']
epc_train_clean_fname = config['DEFAULT']['epc_train_clean_fname']
epc_test_clean_fname = config['DEFAULT']['epc_test_clean_fname']
epc_train_do_fname = config['DEFAULT']['epc_train_domain_fname']
epc_test_do_fname = config['DEFAULT']['epc_test_domain_fname']

In [4]:
dtype_dict = {'INSPECTION_DATE':'str'}

epc_train = pd.read_csv(os.path.join(processing_path,epc_train_clean_fname),header = 0,delimiter = ',',dtype = dtype_dict,
                        parse_dates = ['INSPECTION_DATE'])
epc_test = pd.read_csv(os.path.join(processing_path,epc_test_clean_fname),header = 0,delimiter = ',',dtype = dtype_dict,
                        parse_dates = ['INSPECTION_DATE'])

In [5]:
%store -r chaid_dict

### Creating fields relating to insulation across all relevant variables

https://en.wikipedia.org/wiki/Thermal_transmittance

In [6]:
def att_fields(df):

    # finds the decimal number
    df['floors_att'] = df['FLOOR_DESCRIPTION'].str.findall(r'\d.\d*')
    df['floors_att'] = df['floors_att'].str[0].astype(float)
    # classifies average thermal transmittance
    df['floors_att_good'] = df.apply(lambda row: 1 if row['floors_att'] <= 0.2 and 'mm ' not in str(row['FLOOR_DESCRIPTION']) else 0,axis = 1)
    df['floors_att_poor'] = df.apply(lambda row: 1 if row['floors_att'] >= 1 and 'mm ' not in str(row['FLOOR_DESCRIPTION']) else 0,axis = 1)

    df['walls_att'] = df['WALLS_DESCRIPTION'].str.findall(r'\d.\d*')
    df['walls_att'] = df['walls_att'].str[0].astype(float)

    df['walls_att_good'] = df.apply(lambda row: 1 if row['walls_att'] <= 0.25 else 0,axis = 1)
    df['walls_att_poor'] = df.apply(lambda row: 1 if row['walls_att'] >= 1.5 else 0,axis = 1)

    df['roof_att'] = df['ROOF_DESCRIPTION'].str.findall(r'\d.\d')
    df['roof_att'] = df['roof_att'].str[0].astype(float)

    df['roof_att_good'] = df.apply(lambda row: 1 if row['roof_att'] <= 0.15 and 'mm ' not in str(row['ROOF_DESCRIPTION']) else 0,axis = 1)
    df['roof_att_poor'] = df.apply(lambda row: 1 if row['roof_att'] >= 1 and 'mm ' not in str(row['ROOF_DESCRIPTION']) else 0,axis = 1)
    
    return df

In [7]:
epc_train = att_fields(epc_train)
epc_test = att_fields(epc_test)

In [8]:
# list of all descriptions from variables related to insulation
roof_descriptions = set(epc_train['ROOF_DESCRIPTION']).union(set(epc_test['ROOF_DESCRIPTION']))
floor_descriptions = set(epc_train['FLOOR_DESCRIPTION']).union(set(epc_test['FLOOR_DESCRIPTION']))
wall_descriptions = set(epc_train['WALLS_DESCRIPTION']).union(set(epc_test['WALLS_DESCRIPTION']))

all_descriptions = roof_descriptions.union(floor_descriptions)
all_descriptions = all_descriptions.union(wall_descriptions)

# separate list of descriptions indicating partial insulation
partial_insulation_desc1 = [x for x in all_descriptions if 'limited insulation' in str(x)]
partial_insulation_desc2 = [x for x in all_descriptions if 'partial insulation' in str(x)]
partial_insulation_desc3 = [x for x in all_descriptions if 'insulated' in str(x) and 'no insulation' in str(x)]
partial_insulation_desc4 = [x for x in all_descriptions if 'insulated' in str(x) and '0mm insulation' in str(x)]

partial_insulation_desc = set(partial_insulation_desc1 + partial_insulation_desc2 + partial_insulation_desc3)

# separate list of descriptions indicating complete insulation
insulated_desc1 = [x for x in all_descriptions if 'loft insulation' in str(x)]
insulated_desc2 = [x for x in all_descriptions if 'mm insulation' in str(x) and '0mm insulation' not in str(x)]
insulated_desc3 = [x for x in all_descriptions if 'insulated' in str(x) and 'no insulation' not in str(x)]

insulated_desc = set(insulated_desc1 + insulated_desc2 + insulated_desc3)

# all other descriptions which likely related to no insulation 
no_insulation_desc = [x for x in all_descriptions if x not in partial_insulation_desc and x not in insulated_desc]

# dictionary of insulation terms to be used in replace
insulation_dict = dict.fromkeys(partial_insulation_desc,'partial insulation')
insulation_dict2 = dict.fromkeys(insulated_desc,'insulated')
insulation_dict3 = dict.fromkeys(no_insulation_desc,'no insulation')
insulation_dict.update(insulation_dict2)
insulation_dict.update(insulation_dict3)

In [9]:
epc_train['wall_insulation'] = epc_train['WALLS_DESCRIPTION'].replace(insulation_dict)
epc_test['wall_insulation'] = epc_test['WALLS_DESCRIPTION'].replace(insulation_dict)

epc_train['floor_insulation'] = epc_train['FLOOR_DESCRIPTION'].replace(insulation_dict)
epc_test['floor_insulation'] = epc_test['FLOOR_DESCRIPTION'].replace(insulation_dict)

epc_train['roof_insulation'] = epc_train['ROOF_DESCRIPTION'].replace(insulation_dict)
epc_test['roof_insulation'] = epc_test['ROOF_DESCRIPTION'].replace(insulation_dict)

Combining att fields and insulation fields

In [10]:
epc_train['wall_insul'] = epc_train.apply(lambda row: 'insulated' if row['walls_att_good'] == 1 else row['wall_insulation'], axis=1)
epc_train['floor_insul'] = epc_train.apply(lambda row: 'insulated' if row['floors_att_good'] == 1 else row['floor_insulation'], axis=1)
epc_train['roof_insul'] = epc_train.apply(lambda row: 'insulated' if row['roof_att_good'] == 1 else row['roof_insulation'], axis=1)

epc_test['wall_insul'] = epc_test.apply(lambda row: 'insulated' if row['walls_att_good'] == 1 else row['wall_insulation'], axis=1)
epc_test['floor_insul'] = epc_test.apply(lambda row: 'insulated' if row['floors_att_good'] == 1 else row['floor_insulation'], axis=1)
epc_test['roof_insul'] = epc_test.apply(lambda row: 'insulated' if row['roof_att_good'] == 1 else row['roof_insulation'], axis=1)

### Derived fields from roof type

In [11]:
def roof_types(df):

    df['pitched_roof'] = df.apply(lambda row: 1 if 'pitched' in str(row['ROOF_DESCRIPTION']).lower() else 0,axis = 1)
    df['flat_roof'] = df.apply(lambda row: 1 if 'flat' in str(row['ROOF_DESCRIPTION']).lower() else 0,axis = 1)

    return df

epc_train = roof_types(epc_train)
epc_test = roof_types(epc_test)

### Derived fields from wall type

In [12]:
def wall_types(df):

    df['cavity_wall'] = df.apply(lambda row: 1 if 'cavity wall' in str(row['WALLS_DESCRIPTION']).lower() else 0,axis = 1)
    df['granite_wall'] = df.apply(lambda row: 1 if 'granite or whinstone' in str(row['WALLS_DESCRIPTION']).lower() else 0,axis = 1)
    df['timber_wall'] = df.apply(lambda row: 1 if 'timber frame' in str(row['WALLS_DESCRIPTION']).lower() else 0,axis = 1)
    df['sandstone_wall'] = df.apply(lambda row: 1 if 'sandstone' in str(row['WALLS_DESCRIPTION']).lower() else 0,axis = 1)
    df['brick_wall'] = df.apply(lambda row: 1 if 'solid brick' in str(row['WALLS_DESCRIPTION']).lower() else 0,axis = 1)

    return df

epc_train = wall_types(epc_train)
epc_test = wall_types(epc_test)

### Derived fields from floor type

In [13]:
def floor_types(df):

    df['solid_floor'] = df.apply(lambda row: 1 if 'solid' in str(row['FLOOR_DESCRIPTION']).lower() else 0,axis = 1)
    df['suspended_floor'] = df.apply(lambda row: 1 if 'suspended' in str(row['FLOOR_DESCRIPTION']).lower() else 0,axis = 1)

    return df

epc_train = floor_types(epc_train)
epc_test = floor_types(epc_test)

### Derived fields from hotwater types

In [14]:
def hotwater_types(df):

    df['hotwater_mains'] = df.apply(lambda row: 1 if 'from main system' in str(row['HOTWATER_DESCRIPTION']).lower() else 0,axis = 1)
    df['hotwater_immersion'] = df.apply(lambda row: 1 if 'immersion' in str(row['HOTWATER_DESCRIPTION']).lower() else 0,axis = 1)
    df['hotwater_commmunity'] = df.apply(lambda row: 1 if 'community' in str(row['HOTWATER_DESCRIPTION']).lower() else 0,axis = 1)
    df['hotwater_solar'] = df.apply(lambda row: 1 if 'solar' in str(row['HOTWATER_DESCRIPTION']).lower() else 0,axis = 1)
    df['hotwater_gas'] = df.apply(lambda row: 1 if 'gas' in str(row['HOTWATER_DESCRIPTION']).lower() else 0,axis = 1)
    df['hotwater_recovery'] = df.apply(lambda row: 1 if 'recovery' in str(row['HOTWATER_DESCRIPTION']).lower() else 0,axis = 1)

    return df

epc_train = hotwater_types(epc_train)
epc_test = hotwater_types(epc_test)

### Derived fields from window description

In [15]:
# window description
window_dict = dict.fromkeys(['full double glazing','full triple glazing',
                             'high performance glazing','multiple glazing throughout','double glazing',
                             'multiple glazing throughout double glazing'],1)

window_dict2 = dict.fromkeys(['partial double glazing','mostly double glazing',
                              'some double glazing','single glazing and double glazing','mostly multiple glazing',
                              'partial multiple glazing','single and multiple glazing','mostly triple glazing',
                              'some multiple glazing','partial triple glazing','some triple glazing'],1)

window_dict3 = dict.fromkeys(['full secondary glazing','partial secondary glazing',
                              'mostly secondary glazing','some secondary glazing','secondary glazing',
                              'single glazing and secondary glazing'],1)

In [16]:
def window_types(df):
    df['window_multiple'] = df['WINDOWS_DESCRIPTION'].map(window_dict)
    df['window_partial_multiple'] = df['WINDOWS_DESCRIPTION'].map(window_dict2)
    df['window_secondary'] = df['WINDOWS_DESCRIPTION'].map(window_dict3)
    
    df = df.fillna({'window_partial_multiple':0,'window_secondary':0,'window_multiple':0})
    
    df['window_partial_multiple'] = df['window_partial_multiple'].astype(int)
    df['window_secondary'] = df['window_secondary'].astype(int)
    df['window_multiple'] = df['window_multiple'].astype(int)
    
    return df

epc_train = window_types(epc_train)
epc_test = window_types(epc_test)

### Derived fields from heating control types

In [17]:
def heat_control_types(df):

    df['heat_control_programmer'] = df.apply(lambda row: 1 if 'programmer' in str(row['MAIN_HEATING_CONTROLS']).lower() else 0,axis = 1)
    df['heat_control_trv'] = df.apply(lambda row: 1 if 'trvs' in str(row['MAIN_HEATING_CONTROLS']).lower() else 0,axis = 1)
    
    return df

epc_train = heat_control_types(epc_train)
epc_test = heat_control_types(epc_test)

In [18]:
# list of descriptions indicating room thermostats

heating_control_list = set(epc_train['MAIN_HEATING_CONTROLS']).union(set(epc_test['MAIN_HEATING_CONTROLS']))

room_thermostat = [x for x in heating_control_list if 'thermostat' in str(x) 
                   and 'no room thermostat' not in str(x) and 'no thermostat' not in str(x) 
                   and 'appliance thermostat' not in str(x)]
room_thermostat.remove('no time or thermostat control of room temperature')

# all other descriptions which likely related to no thermostat
no_thermostat_desc = [x for x in heating_control_list if x not in room_thermostat]

room_thermostat_dict = dict.fromkeys(room_thermostat,1)
no_thermostat_dict = dict.fromkeys(no_thermostat_desc,0)

room_thermostat_dict.update(no_thermostat_dict)

epc_train['heat_control_room_thermostat'] = epc_train['MAIN_HEATING_CONTROLS'].replace(room_thermostat_dict)
epc_test['heat_control_room_thermostat'] = epc_test['MAIN_HEATING_CONTROLS'].replace(room_thermostat_dict)

## Other features as data driven approach

### Built Form

In [19]:
built_dict = dict.fromkeys(['Mid-Terrace','End-Terrace'],'terraced')
built_dict1 = dict.fromkeys(['Semi-Detached','Detached'],'detached')
built_dict.update(built_dict1)
epc_train['built_form'] = epc_train['BUILT_FORM'].replace(built_dict)
epc_test['built_form'] = epc_test['BUILT_FORM'].replace(built_dict)

### Energy Tariff

In [20]:
# adding which is missing from chaid due to small volumes, and removing null from other groups
chaid_dict['ENERGY_TARIFF']['node2'].append('off-peak 18 hour')
# chaid_dict['ENERGY_TARIFF']['node1'].remove('Unknown')
chaid_dict['ENERGY_TARIFF']['node1'].remove('<missing>')

In [21]:
energy_dict = dict.fromkeys(chaid_dict['ENERGY_TARIFF']['node1'],'single/dual')
energy_dict1 = dict.fromkeys(chaid_dict['ENERGY_TARIFF']['node2'],'off-peak')
energy_dict['Unknown'] = np.nan 
energy_dict.update(energy_dict1)
epc_train['energy_tariff'] = epc_train['ENERGY_TARIFF'].replace(energy_dict)
epc_test['energy_tariff'] = epc_test['ENERGY_TARIFF'].replace(energy_dict)

### Floor Level

In [22]:
floor_level_dict = dict.fromkeys(['Ground','ground floor','Basement',],'ground floor')
floor_level_dict1 = dict.fromkeys(['1st','2nd','3rd','4th'],'low floors')
floor_level_dict2 = dict.fromkeys(['mid floor','5th','6th','7th','8th','9th','10th','11th'],'mid floors')
floor_level_dict3 = dict.fromkeys(['top floor','12th','13th','14th','15th','16th','17th','18th','19th','20th',
                                   '21st or above'],'mid floors')
floor_level_dict.update(floor_level_dict1)
floor_level_dict.update(floor_level_dict2)
floor_level_dict.update(floor_level_dict3)
epc_train['floor_level'] = epc_train['FLOOR_LEVEL'].replace(floor_level_dict)
epc_test['floor_level'] = epc_test['FLOOR_LEVEL'].replace(floor_level_dict)

### Glazed Type

In [23]:
glazed_dict = dict.fromkeys(['double glazing installed before 2002','double glazing, unknown install date'],'old double glazing')
glazed_dict1 = dict.fromkeys(['triple, known data','triple glazing'],'triple glazing')
glazed_dict2 = dict.fromkeys(['secondary glazing','not defined','single glazing'],'old glazing')
glazed_dict3 = dict.fromkeys(['double, known data','double glazing installed during or after 2002'],'double glazing')
glazed_dict.update(glazed_dict1)
glazed_dict.update(glazed_dict2)
glazed_dict.update(glazed_dict3)
glazed_dict['INVALID!'] = np.nan
epc_train['glazed_type'] = epc_train['GLAZED_TYPE'].replace(glazed_dict)
epc_test['glazed_type'] = epc_test['GLAZED_TYPE'].replace(glazed_dict)

### Property Type

In [24]:
prop_type_dict = dict.fromkeys(['Bungalow','Park home'],'one storey building')
epc_train['property_type'] = epc_train['PROPERTY_TYPE'].replace(prop_type_dict)
epc_test['property_type'] = epc_test['PROPERTY_TYPE'].replace(prop_type_dict)

### Transaction Type

In [25]:
chaid_dict['TRANSACTION_TYPE']['node1'].remove('<missing>')

In [26]:
trans_dict = dict.fromkeys(chaid_dict['TRANSACTION_TYPE']['node1'],'private rental and sale')
trans_dict1 = dict.fromkeys(chaid_dict['TRANSACTION_TYPE']['node2'],'social rental and new build')
trans_dict2 = dict.fromkeys(chaid_dict['TRANSACTION_TYPE']['node3'],'private rental and sale')
trans_dict3 = dict.fromkeys(chaid_dict['TRANSACTION_TYPE']['node4'],'social rental and new build')
#trans_dict4 = dict.fromkeys(chaid_dict['TRANSACTION_TYPE']['node5'],'assessment')
trans_dict.update(trans_dict1)
trans_dict.update(trans_dict2)
trans_dict.update(trans_dict3)
#trans_dict.update(trans_dict4)
trans_dict['unknown'] = np.nan
epc_train['transaction_type'] = epc_train['TRANSACTION_TYPE'].replace(trans_dict)
epc_test['transaction_type'] = epc_test['TRANSACTION_TYPE'].replace(trans_dict)

### Region

In [27]:
region_dict = dict.fromkeys(['Blaenau Gwent','Neath Port Talbot','Pembrokeshire','Rhondda Cynon Taf','Caerphilly',
                             'Flintshire','Carmarthenshire','Powys','Conwy','Ceredigion','Debighshire',
                             'Gwynedd','Isle of Anglesey'],'rural')
region_dict1 = dict.fromkeys(['Bridgend','Monmouthshire','Wrexham','Merthyr Tydfil','Vale of Glamorgan','Cardiff',
                              'Torfaen','Newport','Swansea'],'suburban')
region_dict.update(region_dict1)
epc_train['locality'] = epc_train['region'].replace(region_dict)
epc_test['locality'] = epc_test['region'].replace(region_dict)

In [46]:
# features that correlate with each other or are leading
correlated_variables = ['CO2_EMISS_CURR_PER_FLOOR_AREA','CO2_EMISSIONS_CURRENT','ENERGY_CONSUMPTION_CURRENT',
                     'HEATING_COST_CURRENT','HOT_WATER_COST_CURRENT','HOT_WATER_ENERGY_EFF','HOT_WATER_ENV_EFF',
                     'LIGHTING_COST_CURRENT','LIGHTING_ENERGY_EFF','LIGHTING_ENV_EFF','LMK_KEY','LOW_ENERGY_LIGHTING',
                     'MAIN_FUEL','MAINHEAT_ENERGY_EFF','MAINHEAT_ENV_EFF','MAINHEATC_ENERGY_EFF','MAINHEATC_ENV_EFF',
                     'MAINHEATCONT_DESCRIPTION','MECHANICAL_VENTILATION','MULTI_GLAZE_PROPORTION','NUMBER_HEATED_ROOMS',
                     'POSTCODE','ROOF_ENERGY_EFF','ROOF_ENV_EFF','SECONDHEAT_DESCRIPTION','WALLS_ENERGY_EFF',
                     'WALLS_ENV_EFF','WINDOWS_ENERGY_EFF','WINDOWS_ENV_EFF']

# features replace with binned features
replace_features = ['region','CURRENT_ENERGY_RATING','PROPERTY_TYPE','BUILT_FORM','INSPECTION_DATE','TRANSACTION_TYPE',
                   'ENERGY_TARIFF','FLOOR_LEVEL','GLAZED_TYPE','EXTENSION_COUNT','NUMBER_HABITABLE_ROOMS',
                    'NUMBER_OPEN_FIREPLACES','HOTWATER_DESCRIPTION','FLOOR_DESCRIPTION','MAIN_HEATING_CONTROLS',
                    'WINDOWS_DESCRIPTION','WALLS_DESCRIPTION','ROOF_DESCRIPTION','LIGHTING_DESCRIPTION',
                    'FLOOR_HEIGHT']

# other fields not needed
fields_to_drop = ['floors_average_thermal_transmittance','low_energy_lighting_perc',
                  'roof_average_thermal_transmittance','walls_average_thermal_transmittance',
                  'floors_att','walls_att','roof_att','wall_insulation',
                  'floor_insulation','roof_insulation']

In [49]:
epc_train.drop(correlated_variables,axis = 1,inplace=True)
epc_train.drop(replace_features,axis = 1,inplace=True)
epc_train.drop(fields_to_drop,axis = 1,inplace=True)
epc_test.drop(correlated_variables,axis = 1,inplace=True)
epc_test.drop(replace_features,axis = 1,inplace=True)
epc_test.drop(fields_to_drop,axis = 1,inplace=True)

## export data

In [50]:
epc_train.to_csv(os.path.join(processing_path,epc_train_do_fname),index = False)
epc_test.to_csv(os.path.join(processing_path,epc_test_do_fname),index = False)