<a href="https://colab.research.google.com/github/dannynew111/erp-clustering-smart-meter-data/blob/main/data_ingestion_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import pandas as pd
import os
import gc # Garbage Collector
import pyarrow as pa
import pyarrow.parquet as pq
import os

# Define Paths
data_raw_dir = '/content/drive/MyDrive/ERP/data_raw/'
data_loaded_dir = '/content/drive/MyDrive/ERP/data_loaded/'

# Input files
consumption_d_path = os.path.join(data_raw_dir, 'consumption_d.csv')
consumption_n_path = os.path.join(data_raw_dir, 'consumption_n.csv')
tariff_d_path = os.path.join(data_raw_dir, 'tariff_d.csv')

# NEW Output files
output_file_2012 = os.path.join(data_loaded_dir, 'consumption_2012_v2.parquet')
output_file_2013 = os.path.join(data_loaded_dir, 'consumption_2013_v2.parquet')

# Initialise Parquet Writers
writer_2012 = None
writer_2013 = None

print("Setup complete. Ready to process in chunks.")

Setup complete. Ready to process in chunks.


In [3]:
# DIAGNOSTIC: Count relevant Missing Values in my Raw Data
# This script is for my ERP report so I can quote missingness (limitation)

import pandas as pd
import os

print(" Starting Advanced Raw Data Diagnostic ")

#  Step 1: Consumption Files (with Date Filtering)
files_to_check = {
    'Treatment Group (consumption_d.csv)': consumption_d_path,
    'Control Group (consumption_n.csv)': consumption_n_path
}

total_missing_consumption = 0
total_consumption_cells_in_scope = 0
chunk_size = 100

for description, filepath in files_to_check.items():
    print(f"\nProcessing file: {description}")

    try:
        all_cols = pd.read_csv(filepath, nrows=0).columns.tolist()
        household_cols = all_cols[1:]

        file_missing_count = 0
        file_cells_in_scope = 0

        # Create an iterator to process the file in chunks
        chunk_iterator = pd.read_csv(filepath, chunksize=chunk_size, usecols=['GMT'] + household_cols)

        for i, df_chunk_wide in enumerate(chunk_iterator):
            # Convert GMT column to datetime objects to enable date filtering
            df_chunk_wide['GMT'] = pd.to_datetime(df_chunk_wide['GMT'], errors='coerce')

            # Create a mask for the relevant date range (2012-2013)
            date_mask = (df_chunk_wide['GMT'].dt.year >= 2012) & (df_chunk_wide['GMT'].dt.year <= 2013)

            # Filter the chunk to ONLY include rows within our analytical scope
            df_chunk_in_scope = df_chunk_wide[date_mask]

            if not df_chunk_in_scope.empty:
                # Count nulls ONLY on the in-scope data
                chunk_missing_count = df_chunk_in_scope[household_cols].isnull().sum().sum()
                file_missing_count += chunk_missing_count

                # Count the number of cells ONLY from the in-scope data
                file_cells_in_scope += df_chunk_in_scope[household_cols].size

        print(f"  Found {file_missing_count:,} missing values within the 2012-2013 period.")
        total_missing_consumption += file_missing_count
        total_consumption_cells_in_scope += file_cells_in_scope

    except FileNotFoundError:
        print(f"  ERROR: Could not find file at path: {filepath}. Skipping.")

print("\n Consumption Data Summary (In-Scope Only) ")
print(f"Total relevant missing consumption values (2012-2013): {total_missing_consumption:,}")
if total_consumption_cells_in_scope > 0:
    print(f"Total relevant data cells (readings): {total_consumption_cells_in_scope:,}")
    print(f"Percentage of RELEVANT missing consumption data: {(total_missing_consumption / total_consumption_cells_in_scope):.4%}")



 Starting Advanced Raw Data Diagnostic 

Processing file: Treatment Group (consumption_d.csv)
  Found 7,176,747 missing values within the 2012-2013 period.

Processing file: Control Group (consumption_n.csv)
  Found 30,949,229 missing values within the 2012-2013 period.

 Consumption Data Summary (In-Scope Only) 
Total relevant missing consumption values (2012-2013): 38,125,976
Total relevant data cells (readings): 182,387,424
Percentage of RELEVANT missing consumption data: 20.9038%


In [4]:
def process_file_in_chunks(filepath, group_label, chunk_size=100):
    global writer_2012, writer_2013


    # Load and prepare the tariff data INSIDE the function
    df_tariff = pd.read_csv(tariff_d_path)
    df_tariff.rename(columns={'GMT': 'DateTime'}, inplace=True)
    df_tariff['DateTime'] = pd.to_datetime(df_tariff['DateTime'])

    print(f"\n Processing {group_label} Group from {os.path.basename(filepath)} ")

    all_cols = pd.read_csv(filepath, nrows=0).columns.tolist()
    household_cols = all_cols[1:]

    for i in range(0, len(household_cols), chunk_size):
        cols_to_read = ['GMT'] + household_cols[i:i+chunk_size]
        df_chunk_wide = pd.read_csv(filepath, usecols=cols_to_read)

        df_chunk_long = pd.melt(df_chunk_wide, id_vars=['GMT'], var_name='Household_id', value_name='energy_kwh')
        df_chunk_long['stdorToU'] = group_label

        df_chunk_long.rename(columns={'GMT': 'DateTime'}, inplace=True)
        df_chunk_long['DateTime'] = pd.to_datetime(df_chunk_long['DateTime'])
        df_chunk_long['energy_kwh'] = pd.to_numeric(df_chunk_long['energy_kwh'], errors='coerce')
        df_chunk_long.dropna(inplace=True)

        # MERGE
        df_chunk_long = pd.merge(df_chunk_long, df_tariff, on='DateTime', how='left')

        df_chunk_long['year'] = df_chunk_long['DateTime'].dt.year
        df_2012_chunk = df_chunk_long[df_chunk_long['year'] == 2012]
        df_2013_chunk = df_chunk_long[df_chunk_long['year'] == 2013]

        if not df_2012_chunk.empty:
            table_2012 = pa.Table.from_pandas(df_2012_chunk, preserve_index=False)
            if writer_2012 is None:
                writer_2012 = pq.ParquetWriter(output_file_2012, table_2012.schema)
            writer_2012.write_table(table=table_2012)

        if not df_2013_chunk.empty:
            table_2013 = pa.Table.from_pandas(df_2013_chunk, preserve_index=False)
            if writer_2013 is None:
                writer_2013 = pq.ParquetWriter(output_file_2013, table_2013.schema)
            writer_2013.write_table(table=table_2013)

        print(f"  Processed and appended chunk {i//chunk_size + 1} for {group_label} group...")
        gc.collect()

# Run the processing for both files
process_file_in_chunks(consumption_d_path, 'ToU')
process_file_in_chunks(consumption_n_path, 'Std')

# Close the writers to finalize the files
if writer_2012:
    writer_2012.close()
if writer_2013:
    writer_2013.close()

print(f"\nSuccessfully created new Parquet files:\n- {output_file_2012}\n- {output_file_2013}")


 Processing ToU Group from consumption_d.csv 
  Processed and appended chunk 1 for ToU group...
  Processed and appended chunk 2 for ToU group...
  Processed and appended chunk 3 for ToU group...
  Processed and appended chunk 4 for ToU group...
  Processed and appended chunk 5 for ToU group...
  Processed and appended chunk 6 for ToU group...
  Processed and appended chunk 7 for ToU group...
  Processed and appended chunk 8 for ToU group...
  Processed and appended chunk 9 for ToU group...
  Processed and appended chunk 10 for ToU group...
  Processed and appended chunk 11 for ToU group...

 Processing Std Group from consumption_n.csv 
  Processed and appended chunk 1 for Std group...
  Processed and appended chunk 2 for Std group...
  Processed and appended chunk 3 for Std group...
  Processed and appended chunk 4 for Std group...
  Processed and appended chunk 5 for Std group...
  Processed and appended chunk 6 for Std group...
  Processed and appended chunk 7 for Std group...
  Pr

In [5]:
# Check to make sure Parquet files are correct
# This cell will load and inspect the newly created Parquet files
# to confirm they are correct in structure and content

# Script should prove the existence of both 'ToU' and 'Std' groups
# by reading the first and last blocks of data from the Parquet file.


# Only need to check the 2013 file as it's the larger one
output_file_2013 = '/content/drive/MyDrive/ERP/data_loaded/consumption_2013_v2.parquet'

print(f" Definitive Verification of: {os.path.basename(output_file_2013)} ")

try:
    parquet_file = pq.ParquetFile(output_file_2013)

    # Get the total number of row groups (blocks) in the file
    num_row_groups = parquet_file.num_row_groups
    print(f"File contains {num_row_groups} row groups (blocks).")

    # Preview the FIRST row group
    print("\nPreview of the FIRST block of data:")
    first_group = parquet_file.read_row_group(0).to_pandas()
    display(first_group.head())
    print(f"Unique groups found in first block: {first_group['stdorToU'].unique()}")

    # Preview the LAST row group
    print("\nPreview of the LAST block of data:")
    last_group = parquet_file.read_row_group(num_row_groups - 1).to_pandas()
    display(last_group.head())
    print(f"Unique groups found in last block: {last_group['stdorToU'].unique()}")

    print("\nThe presence of both 'ToU' and 'Std' groups is confirmed.")

except Exception as e:
    print(f"An error occurred: {e}")


 Definitive Verification of: consumption_2013_v2.parquet 
File contains 105 row groups (blocks).

Preview of the FIRST block of data:


Unnamed: 0,DateTime,Household_id,energy_kwh,stdorToU,Price,Event_tags,year
0,2013-01-01 00:00:00,D0000,1.043,ToU,,,2013
1,2013-01-01 00:30:00,D0000,0.404,ToU,0.1176,,2013
2,2013-01-01 01:00:00,D0000,0.185,ToU,0.1176,,2013
3,2013-01-01 01:30:00,D0000,0.151,ToU,0.1176,,2013
4,2013-01-01 02:00:00,D0000,0.139,ToU,0.1176,,2013


Unique groups found in first block: ['ToU']

Preview of the LAST block of data:


Unnamed: 0,DateTime,Household_id,energy_kwh,stdorToU,Price,Event_tags,year
0,2013-01-20 11:00:00,N4163,0.287,Std,0.0399,CM,2013
1,2013-01-20 11:30:00,N4163,0.265,Std,0.0399,CM,2013
2,2013-01-20 12:00:00,N4163,0.282,Std,0.0399,CM,2013
3,2013-01-20 12:30:00,N4163,0.301,Std,0.0399,CM,2013
4,2013-01-20 13:00:00,N4163,0.288,Std,0.0399,CM,2013


Unique groups found in last block: ['Std']

The presence of both 'ToU' and 'Std' groups is confirmed.
