# Hourly Electricity‑Meter ETL Pipeline  
This notebook imports raw building‑level electricity data, cleans and reshapes it, aligns timestamps to UTC, enriches the data with building metadata and hourly weather, and writes a compressed Parquet.

In [18]:
import os
import pandas as pd
import numpy as np
from glob import glob
import warnings
warnings.filterwarnings('ignore', 
                       message='DataFrameGroupBy.apply operated on the grouping columns')

In [19]:
path_electricity = '../data/raw/electricity.csv'
path_combined = '../data/combinedmeters/'

Transforms data table from wide to long format. Concatenates all the transformed data into a single dataframe

In [20]:
meter = pd.read_csv(path_electricity)
meter = pd.melt(meter, id_vars="timestamp", var_name="building_id", value_name="meter_reading")

complete_data = meter.copy()

del(meter)

print(f"Processed electricity meter data with {len(complete_data)} readings")
print(f"Unique buildings: {complete_data['building_id'].nunique()}")

Processed electricity meter data with 27684432 readings
Unique buildings: 1578


In [21]:
complete_data.head()

Unnamed: 0,timestamp,building_id,meter_reading
0,2016-01-01 00:00:00,Panther_parking_Lorriane,0.0
1,2016-01-01 01:00:00,Panther_parking_Lorriane,0.0
2,2016-01-01 02:00:00,Panther_parking_Lorriane,0.0
3,2016-01-01 03:00:00,Panther_parking_Lorriane,0.0
4,2016-01-01 04:00:00,Panther_parking_Lorriane,0.0


## Type Casting & Basic Profiling  
* Round readings to four decimals for storage efficiency.  
* Parse `timestamp` as `datetime`.  
* Inspect dtypes and memory usage.  

In [22]:
complete_data["meter_reading"] = round(complete_data["meter_reading"],4)
complete_data["timestamp"] = pd.to_datetime(complete_data["timestamp"], format='%Y-%m-%d %H:%M:%S')
complete_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 27684432 entries, 0 to 27684431
Data columns (total 3 columns):
 #   Column         Dtype         
---  ------         -----         
 0   timestamp      datetime64[ns]
 1   building_id    object        
 2   meter_reading  float64       
dtypes: datetime64[ns](1), float64(1), object(1)
memory usage: 633.6+ MB


## Helper: Local → UTC Converter  
Defines `local_to_utc(group)` that localizes per‑building timestamps using the building’s timezone and converts them to UTC, handling DST edge cases.  

In [23]:
def local_to_utc(group):
    local_tz = group["timezone"].iloc[0]
    # Keep the original timestamp (assumed to be local)
    # Create a new column for the UTC timestamp
    group["timestamp_utc"] = (
        group["timestamp"]
        .dt.tz_localize(
            local_tz,
            nonexistent="shift_forward",  # handle missing times by shifting forward an hour
            ambiguous="NaT"               # handle duplicated times by marking as NaT
        )
        .dt.tz_convert("UTC")
    )
    return group

## Join Building Metadata  
Adds `timezone`, `site_id`, `primaryspaceusage`, and `sqm` fields from the metadata table for downstream analyses and weather merge.  

In [24]:
tz_info = pd.read_csv("../data/raw/metadata.csv")
tz_info = tz_info[["building_id", "timezone", "site_id", "primaryspaceusage","sqm"]]

complete_data = pd.merge(complete_data, tz_info, on="building_id", how="left")

In [25]:
tz_info.head()

Unnamed: 0,building_id,timezone,site_id,primaryspaceusage,sqm
0,Panther_lodging_Dean,US/Eastern,Panther,Lodging/residential,508.8
1,Panther_lodging_Shelia,US/Eastern,Panther,Lodging/residential,929.0
2,Panther_lodging_Ricky,US/Eastern,Panther,Lodging/residential,483.1
3,Panther_education_Rosalie,US/Eastern,Panther,Education,690.5
4,Panther_education_Misty,US/Eastern,Panther,Education,252.7


In [26]:

complete_data.head()

Unnamed: 0,timestamp,building_id,meter_reading,timezone,site_id,primaryspaceusage,sqm
0,2016-01-01 00:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7
1,2016-01-01 01:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7
2,2016-01-01 02:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7
3,2016-01-01 03:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7
4,2016-01-01 04:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7


## Missing‑Data Diagnostics  
Quantifies overall and per‑building missingness to guide gap‑filling or filtering decisions.  

In [27]:
# Check how many values are present vs. missing
total_values = len(complete_data)
missing_values = complete_data['meter_reading'].isna().sum()
present_values = total_values - missing_values

print(f"Total rows: {total_values:,}")
print(f"Present values: {present_values:,} ({present_values/total_values*100:.2f}%)")
print(f"Missing values: {missing_values:,} ({missing_values/total_values*100:.2f}%)")

# Check missing data distribution by building
missing_by_building = (
    complete_data
    .groupby('building_id')['meter_reading']
    .apply(lambda x: x.isna().mean() * 100)
    .sort_values(ascending=False)
)

print("\nTop 10 buildings with highest percentage of missing values:")
print(missing_by_building.head(10))

print("\nBottom 10 buildings with lowest percentage of missing values:")
print(missing_by_building.tail(10))

print(f"\nAverage percentage of missing values across buildings: {missing_by_building.mean():.2f}%")

Total rows: 27,684,432
Present values: 26,372,337 (95.26%)
Missing values: 1,312,095 (4.74%)

Top 10 buildings with highest percentage of missing values:
building_id
Eagle_lodging_Garland      100.000000
Rat_public_Ulysses         100.000000
Bobcat_education_Barbra     99.373005
Bobcat_education_Seth       96.243730
Rat_education_Mac           94.795942
Rat_education_Angelica      91.375969
Rat_education_Kristie       91.102371
Rat_education_Chance        89.871181
Peacock_public_Linda        83.595531
Bobcat_public_Angie         83.008436
Name: meter_reading, dtype: float64

Bottom 10 buildings with lowest percentage of missing values:
building_id
Hog_education_Jordan      0.0
Hog_education_Jewel       0.0
Hog_education_Jared       0.0
Hog_education_Donnie      0.0
Hog_assembly_Dona         0.0
Hog_assembly_Colette      0.0
Hog_assembly_Arlie        0.0
Hog_assembly_Annemarie    0.0
Hog_education_Casandra    0.0
Hog_assembly_Una          0.0
Name: meter_reading, dtype: float64

Averag

In [28]:
complete_data.head()

Unnamed: 0,timestamp,building_id,meter_reading,timezone,site_id,primaryspaceusage,sqm
0,2016-01-01 00:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7
1,2016-01-01 01:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7
2,2016-01-01 02:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7
3,2016-01-01 03:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7
4,2016-01-01 04:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7


## Duplicate Check  
Ensures each (`timestamp`, `building_id`) pair is unique, listing any problematic duplicates for inspection.  

In [29]:
# Add a new cell to check for duplicates in the merged_data dataframe
def check_merged_data_duplicates():
    # Count total rows
    total_rows = len(complete_data)
    
    # Count unique timestamp-building_id combinations
    unique_pairs = complete_data.drop_duplicates(subset=['timestamp', 'building_id']).shape[0]
    
    # Calculate duplicates
    duplicates = total_rows - unique_pairs
    
    print(f"\n=== Duplicate Analysis for merged_data ===")
    print(f"Total rows: {total_rows:,}")
    print(f"Unique timestamp-building_id pairs: {unique_pairs:,}")
    print(f"Duplicate pairs: {duplicates:,} ({duplicates/total_rows*100:.2f}% of rows)")
    
    # If duplicates exist, show the most frequent cases
    if duplicates > 0:
        # Count occurrences of each timestamp-building_id combination
        dupes = complete_data.groupby(['timestamp', 'building_id']).size().reset_index(name='count')
        dupes = dupes[dupes['count'] > 1].sort_values('count', ascending=False)
        
        print(f"\nTop 5 most duplicated timestamp-building_id combinations:")
        print(dupes.head(5))
        
        # Show example rows for the most duplicated combination
        if len(dupes) > 0:
            most_duplicated = dupes.iloc[0]
            ts = most_duplicated['timestamp']
            bldg = most_duplicated['building_id']
            
            print(f"\nExample rows for most duplicated combination:")
            print(f"timestamp: {ts}, building_id: {bldg}, count: {most_duplicated['count']}")
            print(complete_data[(complete_data['timestamp'] == ts) & 
                            (complete_data['building_id'] == bldg)].head(3))
    else:
        print("\nNo duplicates found in complete_data.")

# Run the check
check_merged_data_duplicates()


=== Duplicate Analysis for merged_data ===
Total rows: 27,684,432
Unique timestamp-building_id pairs: 27,684,432
Duplicate pairs: 0 (0.00% of rows)

No duplicates found in complete_data.


## Apply Local→UTC Conversion  
* Applies the helper per building.  
* Keeps both local and UTC timestamps for flexibility.  
* Drops helper columns and prints a sample.  


In [30]:
# Apply timezone conversion to UTC and keep the original timestamp
complete_data_with_utc = (
    complete_data
        .groupby(["building_id"], group_keys=False)
        .apply(local_to_utc)   # Keep the current behavior
)

# Remove the timezone info from the UTC timestamp for easier use
complete_data_with_utc["timestamp_utc"] = complete_data_with_utc["timestamp_utc"].dt.tz_localize(None)

# Rename columns for clarity
complete_data_with_utc = complete_data_with_utc.rename(columns={
    "timestamp": "timestamp_local",
    "timestamp_utc": "timestamp_utc"
})

# Drop the now unnecessary columns if they exist
if "timestamp_utc_naive" in complete_data_with_utc.columns:
    complete_data_with_utc = complete_data_with_utc.drop(columns=["timestamp_utc_naive"])

# Now you have just two timestamp columns - local and UTC
print(f"Columns after conversion: {complete_data_with_utc.columns.tolist()}")
print(f"Sample timezone conversion:")
sample = complete_data_with_utc[['timestamp_local', 'timestamp_utc', 'timezone']].head(3)
print(sample)

Columns after conversion: ['timestamp_local', 'building_id', 'meter_reading', 'timezone', 'site_id', 'primaryspaceusage', 'sqm', 'timestamp_utc']
Sample timezone conversion:
      timestamp_local       timestamp_utc    timezone
0 2016-01-01 00:00:00 2016-01-01 05:00:00  US/Eastern
1 2016-01-01 01:00:00 2016-01-01 06:00:00  US/Eastern
2 2016-01-01 02:00:00 2016-01-01 07:00:00  US/Eastern


In [31]:

complete_data_with_utc.head()

Unnamed: 0,timestamp_local,building_id,meter_reading,timezone,site_id,primaryspaceusage,sqm,timestamp_utc
0,2016-01-01 00:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7,2016-01-01 05:00:00
1,2016-01-01 01:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7,2016-01-01 06:00:00
2,2016-01-01 02:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7,2016-01-01 07:00:00
3,2016-01-01 03:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7,2016-01-01 08:00:00
4,2016-01-01 04:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7,2016-01-01 09:00:00


## Merge Hourly Weather  
* Loads hourly outdoor air temperature (`airTemperature`) by `site_id`.  
* Merges on `timestamp_local` and `site_id`.  
* Falls back to metadata join if `site_id` wasn’t already present.  


In [32]:
# Load weather data
weather_data = pd.read_csv("../data/raw/weather.csv", 
                          usecols=['timestamp', 'site_id', 'airTemperature'])
# Parse timestamps as local time (not UTC)
weather_data['timestamp'] = pd.to_datetime(weather_data['timestamp'])

# Check if complete_data_with_utc already has site_id column(s)
site_id_col = 'site_id' if 'site_id' in complete_data_with_utc.columns else ('site_id_x' if 'site_id_x' in complete_data_with_utc.columns else None)

# Merge with weather data using appropriate site_id column
# Note: Using timestamp_local instead of timestamp_utc since weather data is in local time
if site_id_col:
    # Already has site_id, use it directly for weather merge
    meter_with_temp = complete_data_with_utc.merge(
        weather_data[['timestamp', 'site_id', 'airTemperature']],
        left_on=['timestamp_local', 'site_id'],
        right_on=['timestamp', 'site_id'],
        how='left',
        validate='many_to_one'
    )
    # Clean up by removing the duplicate timestamp column from weather data
    if 'timestamp' in meter_with_temp.columns:
        meter_with_temp = meter_with_temp.drop(columns=['timestamp'])
else:
    # No site_id, need to get it from metadata first
    meta_data = pd.read_csv("data/metadata/metadata.csv",
                          usecols=['building_id', 'site_id', 'primaryspaceusage'])
    
    # Add site_id information to meter data
    meter_with_temp = complete_data_with_utc.merge(
        meta_data[['building_id', 'site_id']],
        on='building_id',
        how='left'
    ).merge(
        weather_data[['timestamp', 'site_id', 'airTemperature']],
        left_on=['timestamp_local', 'site_id'],
        right_on=['timestamp', 'site_id'],
        how='left',
        validate='many_to_one'
    )
    # Clean up by removing the duplicate timestamp column from weather data
    if 'timestamp' in meter_with_temp.columns:
        meter_with_temp = meter_with_temp.drop(columns=['timestamp'])

In [33]:
meter_with_temp.head()

Unnamed: 0,timestamp_local,building_id,meter_reading,timezone,site_id,primaryspaceusage,sqm,timestamp_utc,airTemperature
0,2016-01-01 00:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7,2016-01-01 05:00:00,19.4
1,2016-01-01 01:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7,2016-01-01 06:00:00,21.1
2,2016-01-01 02:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7,2016-01-01 07:00:00,21.1
3,2016-01-01 03:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7,2016-01-01 08:00:00,20.6
4,2016-01-01 04:00:00,Panther_parking_Lorriane,0.0,US/Eastern,Panther,Parking,36012.7,2016-01-01 09:00:00,21.1


Exporting the data

In [34]:
# Save total hourly consumption
meter_with_temp.to_parquet(
    path_combined + "meter_with_temp.parquet",
    compression="snappy",
    index=False
)
print(f"Saved hourly building totals to {path_combined}meter_with_temp.parquet")



Saved hourly building totals to ../data/combinedmeters/meter_with_temp.parquet
