In [22]:
# 2.0 ‚Äî Clean column names and data types for electricity demand (chunk)
import pandas as pd
import numpy as np
from pathlib import Path
from IPython.display import display

def clean_electricity_data_chunk(chunk):
    """Clean a chunk/dataframe of electricity demand records.

    Actions:
    - normalize column names (lowercase, underscores)
    - parse a timestamp-like column to datetime if present
    - extract date/time features (year, month, day, hour, minute)
    - coerce demand/load columns to numeric (float64)
    - strip text columns and preserve provenance

    The function enforces consistent dtypes for numeric columns (float64) and uses
    pandas nullable `Int64` for integer-like date fields to reduce parquet schema
    mismatches across chunks.
    """
    df = chunk.copy()

    # 1) Normalize column names
    def _clean_colname(c):
        c = str(c).strip()
        c = c.replace(" ", "_")
        c = c.replace("/", "_")
        c = c.replace("-", "_")
        c = c.replace("(", "_").replace(")", "_")
        # lowercase and collapse repeated underscores
        c = "_".join([p for p in c.lower().split("_") if p])
        return c

    df.columns = [_clean_colname(c) for c in df.columns]

    # 2) Identify a timestamp column (typical names: date, time, timestamp, datetime, settlement_date)
    ts_candidates = [c for c in df.columns if any(k in c for k in ['date', 'time', 'timestamp', 'datetime', 'settlement'])]
    ts_col = ts_candidates[0] if ts_candidates else None

    if ts_col is not None:
        # parse datetimes defensively
        df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
        # use pandas nullable integer dtype for date parts
        df['year'] = df[ts_col].dt.year.astype('Int64')
        df['month'] = df[ts_col].dt.month.astype('Int64')
        df['day'] = df[ts_col].dt.day.astype('Int64')
        df['hour'] = df[ts_col].dt.hour.astype('Int64')
        df['minute'] = df[ts_col].dt.minute.astype('Int64')
        df['date'] = df[ts_col].dt.date
    else:
        df['year'] = pd.Series([pd.NA] * len(df), dtype='Int64')
        df['month'] = pd.Series([pd.NA] * len(df), dtype='Int64')
        df['day'] = pd.Series([pd.NA] * len(df), dtype='Int64')
        df['hour'] = pd.Series([pd.NA] * len(df), dtype='Int64')
        df['minute'] = pd.Series([pd.NA] * len(df), dtype='Int64')
        df['date'] = pd.Series([pd.NA] * len(df))

    # 3) Coerce obvious numeric demand/load columns to numeric (float64)
    demand_candidates = [c for c in df.columns if any(k in c for k in ['demand', 'load', 'mw', 'mwh', 'kw', 'kwh', 'total', 'value'])]

    for c in demand_candidates:
        df[c] = pd.to_numeric(df[c], errors='coerce').astype('float64')

    # Also coerce other numeric-like columns to float64 for consistent schema
    for c in df.columns:
        if c not in demand_candidates and c not in ['year', 'month', 'day', 'hour', 'minute']:
            # try to coerce columns that look numeric
            if df[c].dtype == object:
                # heuristic: if >50% of non-null values are numeric, coerce
                nonnull = df[c].dropna()
                if len(nonnull) > 0:
                    # cast to string first to safely use string operations
                    nonnull_str = nonnull.astype(str)
                    num_like = nonnull_str.str.replace(r"[^0-9eE+-.]", "", regex=True).str.match(r"^[+-]?[0-9]*\.?[0-9]+(?:[eE][+-]?[0-9]+)?$")
                    if num_like.mean() > 0.5:
                        df[c] = pd.to_numeric(df[c], errors='coerce').astype('float64')

    # 4) Normalise text columns: strip
    for col in df.select_dtypes(include=['object']).columns:
        try:
            df[col] = df[col].astype(str).str.strip()
        except Exception:
            pass

    # 5) Keep a stable primary demand column if found, and create `demand_value` alias
    primary_demand = None
    for c in demand_candidates:
        if 'demand' in c:
            primary_demand = c
            break
    if primary_demand is None and demand_candidates:
        primary_demand = demand_candidates[0]

    if primary_demand is not None:
        df['demand_value'] = df[primary_demand]
    else:
        df['demand_value'] = pd.Series([pd.NA] * len(df), dtype='float64')

    # 6) Preserve provenance
    if 'source_file' not in df.columns:
        df['source_file'] = pd.Series([pd.NA] * len(df))
    else:
        df['source_file'] = df['source_file'].astype(str)

    # Ensure consistent dtypes: cast numeric columns to float64 and date parts to Int64
    for c in df.select_dtypes(include=['number']).columns:
        df[c] = df[c].astype('float64')

    for dcol in ['year', 'month', 'day', 'hour', 'minute']:
        if dcol in df.columns:
            df[dcol] = df[dcol].astype('Int64')

    return df

# Test the cleaner on available data: prefer `df_all` (full read), then `df_sample`, then read first CSV
if 'df_all' in globals():
    df_test = df_all.head(1000)
    print("Testing `clean_electricity_data_chunk` on `df_all.head(1000)`")
elif 'df_sample' in globals():
    df_test = df_sample
    print("Testing `clean_electricity_data_chunk` on `df_sample`")
else:
    # try reading the first CSV file as a fallback
    data_dir = Path("../Dataset_2_UK_Historic_Electricity_Demand_Data")
    csv_files = sorted(data_dir.glob("*.csv"))
    if csv_files:
        print(f"Reading a small fallback sample from {csv_files[0].name}")
        df_test = pd.read_csv(csv_files[0], nrows=500)
    else:
        df_test = None
        print("No data available to test the cleaning function.")

if df_test is not None:
    df_cleaned_sample = clean_electricity_data_chunk(df_test)
    print("‚úÖ Cleaning function created and tested!")
    print(f"Original columns: {list(df_test.columns)[:20]}")
    print(f"New columns: {list(df_cleaned_sample.columns)[:30]}")
    print(f"New shape: {df_cleaned_sample.shape}")
    display(df_cleaned_sample.head())


Reading a small fallback sample from demanddata_2001.csv
‚úÖ Cleaning function created and tested!
Original columns: ['SETTLEMENT_DATE', 'SETTLEMENT_PERIOD', 'ND', 'TSD', 'ENGLAND_WALES_DEMAND', 'EMBEDDED_WIND_GENERATION', 'EMBEDDED_WIND_CAPACITY', 'EMBEDDED_SOLAR_GENERATION', 'EMBEDDED_SOLAR_CAPACITY', 'NON_BM_STOR', 'PUMP_STORAGE_PUMPING', 'SCOTTISH_TRANSFER', 'IFA_FLOW', 'IFA2_FLOW', 'BRITNED_FLOW', 'MOYLE_FLOW', 'EAST_WEST_FLOW', 'NEMO_FLOW', 'NSL_FLOW', 'ELECLINK_FLOW']
New columns: ['settlement_date', 'settlement_period', 'nd', 'tsd', 'england_wales_demand', 'embedded_wind_generation', 'embedded_wind_capacity', 'embedded_solar_generation', 'embedded_solar_capacity', 'non_bm_stor', 'pump_storage_pumping', 'scottish_transfer', 'ifa_flow', 'ifa2_flow', 'britned_flow', 'moyle_flow', 'east_west_flow', 'nemo_flow', 'nsl_flow', 'eleclink_flow', 'viking_flow', 'greenlink_flow', 'year', 'month', 'day', 'hour', 'minute', 'date', 'demand_value', 'source_file']
New shape: (500, 30)


Unnamed: 0,settlement_date,settlement_period,nd,tsd,england_wales_demand,embedded_wind_generation,embedded_wind_capacity,embedded_solar_generation,embedded_solar_capacity,non_bm_stor,...,viking_flow,greenlink_flow,year,month,day,hour,minute,date,demand_value,source_file
0,2001-01-01,1.0,38631.0,,34060.0,,,,,0.0,...,,,2001,1,1,0,0,2001-01-01,34060.0,
1,2001-01-01,2.0,39808.0,,35370.0,,,,,0.0,...,,,2001,1,1,0,0,2001-01-01,35370.0,
2,2001-01-01,3.0,40039.0,,35680.0,,,,,0.0,...,,,2001,1,1,0,0,2001-01-01,35680.0,
3,2001-01-01,4.0,39339.0,,35029.0,,,,,0.0,...,,,2001,1,1,0,0,2001-01-01,35029.0,
4,2001-01-01,5.0,38295.0,,34047.0,,,,,0.0,...,,,2001,1,1,0,0,2001-01-01,34047.0,


In [23]:
# 2.1 Create strategic subset for electricity (streaming -> parquet) with reservoir sampling
from pathlib import Path
import pandas as pd
import random
import math


def create_strategic_subset_elec(csv_dir,
                                 start_year=2001,
                                 end_year=2025,
                                 out_dir='data/interim',
                                 sample_n=500,
                                 chunksize=200_000,
                                 parquet_engine_preference=('pyarrow', 'fastparquet')):
    """Stream all CSVs in `csv_dir`, clean chunks using `clean_electricity_data_chunk`,
    write a cleaned full dataset.

    Uses reservoir sampling during streaming to produce unbiased sample (memory-bounded).
    Now with schema standardization to handle column evolution across years.
    """
    csv_dir = Path(csv_dir)
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    full_stem = 'elec_cleaned_full'
    full_parquet = out_dir / f"{full_stem}.parquet"
    full_csv_out = out_dir / f"{full_stem}.csv"
    full_sample_path = out_dir / f"{full_stem}_sample.csv"

    # Parquet engine detection
    parquet_engine = None
    for eng in parquet_engine_preference:
        try:
            import importlib
            importlib.import_module(eng)
            parquet_engine = eng
            break
        except Exception:
            continue

    use_parquet = parquet_engine is not None

    # Setup pyarrow writers if available
    parquet_writer_full = None
    pa = None
    pq = None
    if use_parquet and parquet_engine == 'pyarrow':
        try:
            import pyarrow as pa
            import pyarrow.parquet as pq
        except Exception:
            use_parquet = False

    total_written = 0
    first_write_full = True
    first_write_csv = True

    # Pre-check existence
    parquet_exists = full_parquet.exists() if use_parquet else False
    csv_exists = full_csv_out.exists()

    if parquet_exists and csv_exists:
        print(f"Full cleaned outputs already exist; will skip writing:")
        print(f"  - Parquet: {full_parquet}")
        print(f"  - CSV: {full_csv_out}")
        full_exists = True
    else:
        full_exists = False
        if parquet_exists:
            print(f"Parquet exists but CSV missing. Will create CSV: {full_csv_out}")
        if csv_exists:
            print(f"CSV exists but Parquet missing. Will create Parquet: {full_parquet}")

    # Reservoir for streaming unbiased sample
    full_reservoir = []
    full_count = 0

    # ===== SCHEMA DETECTION: Scan all files to get complete column set =====
    master_schema = None
    all_columns = set()
    
    csv_files = sorted(csv_dir.glob('*.csv'))
    if not csv_files:
        print(f"No CSV files found in {csv_dir}")
        return {}

    if not full_exists:
        print("üîç Phase 1: Detecting complete schema across all years...")
        for fpath in csv_files:
            try:
                # Read just first chunk to get column names
                sample_chunk = pd.read_csv(fpath, nrows=100, low_memory=False)
                sample_cleaned = clean_electricity_data_chunk(sample_chunk)
                all_columns.update(sample_cleaned.columns)
            except Exception as exc:
                print(f"Warning: could not scan {fpath.name}: {exc}")
        
        # Sort columns for consistent ordering
        all_columns = sorted(list(all_columns))
        print(f"‚úÖ Detected {len(all_columns)} unique columns across all years")
        
        print("\n‚öôÔ∏è Phase 2: Processing and writing data with standardized schema...")
        
        # Iterate all CSV files and stream their chunks
        for f_i, fpath in enumerate(csv_files):
            try:
                for i, chunk in enumerate(pd.read_csv(fpath, chunksize=chunksize, low_memory=False)):
                    chunk_cleaned = clean_electricity_data_chunk(chunk)

                    # ===== SCHEMA STANDARDIZATION: Add missing columns with proper dtype =====
                    import numpy as np
                    for col in all_columns:
                        if col not in chunk_cleaned.columns:
                            # Infer dtype from existing columns: use appropriate NaN for each type
                            if col in ['year', 'month', 'day', 'hour', 'minute']:
                                chunk_cleaned[col] = pd.Series([pd.NA] * len(chunk_cleaned), dtype='Int64')
                            elif col in ['date', 'settlement_date', 'source_file']:
                                chunk_cleaned[col] = None  # Will be treated as null strings
                            else:
                                # Use numpy nan for float columns (PyArrow compatible)
                                chunk_cleaned[col] = np.nan
                    
                    # Reorder columns to match master schema
                    chunk_cleaned = chunk_cleaned[all_columns]

                    # ===== write to full cleaned output (parquet and CSV) =====
                    try:
                        # Write Parquet
                        if use_parquet and parquet_engine == 'pyarrow' and not parquet_exists:
                            table_full = pa.Table.from_pandas(chunk_cleaned)
                            if parquet_writer_full is None:
                                # Establish schema from first standardized chunk
                                master_schema = table_full.schema
                                parquet_writer_full = pq.ParquetWriter(str(full_parquet), master_schema)
                            parquet_writer_full.write_table(table_full)
                        
                        # Write CSV (complete dataset) with proper type conversion
                        if not csv_exists:
                            # Prepare CSV-friendly version (convert to dict then back to DataFrame)
                            # This approach matches the sample CSV and properly handles Int64 -> int64
                            chunk_csv = pd.DataFrame(chunk_cleaned.to_dict('records'))
                            
                            # Convert date objects to strings if present
                            if 'date' in chunk_csv.columns and chunk_csv['date'].dtype == object:
                                chunk_csv['date'] = chunk_csv['date'].astype(str)
                            if 'settlement_date' in chunk_csv.columns and chunk_csv['settlement_date'].dtype == object:
                                chunk_csv['settlement_date'] = chunk_csv['settlement_date'].astype(str)
                            
                            if first_write_csv:
                                chunk_csv.to_csv(full_csv_out, index=False, mode='w', na_rep='NaN')
                                first_write_csv = False
                            else:
                                chunk_csv.to_csv(full_csv_out, index=False, header=False, mode='a', na_rep='NaN')
                        
                        first_write_full = False
                        total_written += len(chunk_cleaned)
                    except Exception as e:
                        print(f'Warning: failed to write chunk from {fpath.name}:', e)

                    # ===== update reservoir for unbiased sampling =====
                    for _, row in chunk_cleaned.iterrows():
                        full_count += 1
                        if len(full_reservoir) < sample_n:
                            full_reservoir.append(row.to_dict())
                        else:
                            s = random.randint(1, full_count)
                            if s <= sample_n:
                                idx = random.randint(0, sample_n - 1)
                                full_reservoir[idx] = row.to_dict()

                    # periodic progress
                    if (i + 1) % 10 == 0:
                        print(f"Processed chunk {i+1} of file {fpath.name}. Total rows so far: {total_written}")
            except Exception as exc:
                print(f"Warning: failed to stream {fpath.name}: {exc}")

        print(f"\n‚úÖ Phase 2 complete: Processed all {len(csv_files)} files")

    # Close writer
    try:
        if use_parquet and parquet_engine == 'pyarrow':
            if parquet_writer_full is not None:
                parquet_writer_full.close()
    except Exception:
        pass

    # Persist reservoir sample to CSV
    try:
        if len(full_reservoir) > 0:
            pd.DataFrame(full_reservoir).head(sample_n).to_csv(full_sample_path, index=False, na_rep='NaN')
        else:
            pd.DataFrame().to_csv(full_sample_path, index=False, na_rep='NaN')
    except Exception as e:
        print('Warning: failed to persist reservoir sample due to:', e)
        pd.DataFrame().to_csv(full_sample_path, index=False, na_rep='NaN')

    result = {
        'full_parquet': str(full_parquet) if use_parquet else None,
        'full_csv': str(full_csv_out),
        'full_sample': str(full_sample_path),
        'rows_written': int(total_written)
    }

    print(f"Done. Total rows written: {total_written}")
    if result['full_parquet']:
        print(f"Full cleaned Parquet file: {result['full_parquet']}")
    print(f"Full cleaned CSV file: {result['full_csv']}")
    print(f"Sample file: {result['full_sample']}")

    return result


print('create_strategic_subset_elec ready: streams all CSVs in folder, writes cleaned full dataset (parquet preferred), and creates unbiased sample (reservoir sampling).')

create_strategic_subset_elec ready: streams all CSVs in folder, writes cleaned full dataset (parquet preferred), and creates unbiased sample (reservoir sampling).


In [25]:
# 2.2 Runner: execute the streaming ETL for electricity and verify outputs
# Run this cell after `clean_electricity_data_chunk` and `create_strategic_subset_elec`
from pathlib import Path

try:
    print("Starting create_strategic_subset_elec for full dataset (this will stream and write files)...")
    res = create_strategic_subset_elec("../Dataset_2_UK_Historic_Electricity_Demand_Data")
    print('\nresult:', res)

    out_paths = []
    if res.get('full_parquet'):
        out_paths.append(Path(res['full_parquet']))
    elif res.get('full_csv'):
        out_paths.append(Path(res['full_csv']))

    if res.get('full_sample'):
        out_paths.append(Path(res['full_sample']))

    exist_map = {str(p): p.exists() for p in out_paths}
    print('\nOutputs existence:')
    for p, exists in exist_map.items():
        print(f"- {p}: {exists}")

    interim = Path('data/interim')
    if interim.exists():
        listing = sorted([p.name for p in interim.iterdir()])
    else:
        listing = []
    print('\nContents of data/interim:', listing)

except Exception as e:
    print('Runner failed with exception:', e)
    raise


Starting create_strategic_subset_elec for full dataset (this will stream and write files)...
Parquet exists but CSV missing. Will create CSV: data/interim/elec_cleaned_full.csv
üîç Phase 1: Detecting complete schema across all years...
‚úÖ Detected 30 unique columns across all years

‚öôÔ∏è Phase 2: Processing and writing data with standardized schema...


  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce')
  df[ts_col] = pd.to_datetime(df[ts_col], errors='coerce


‚úÖ Phase 2 complete: Processed all 25 files
Done. Total rows written: 434014
Full cleaned Parquet file: data/interim/elec_cleaned_full.parquet
Full cleaned CSV file: data/interim/elec_cleaned_full.csv
Sample file: data/interim/elec_cleaned_full_sample.csv

result: {'full_parquet': 'data/interim/elec_cleaned_full.parquet', 'full_csv': 'data/interim/elec_cleaned_full.csv', 'full_sample': 'data/interim/elec_cleaned_full_sample.csv', 'rows_written': 434014}

Outputs existence:
- data/interim/elec_cleaned_full.parquet: True
- data/interim/elec_cleaned_full_sample.csv: True

Contents of data/interim: ['elec_cleaned_full.csv', 'elec_cleaned_full.parquet', 'elec_cleaned_full_sample.csv']


In [None]:
# 2.3 Regenerate full CSV from Parquet with consistent null handling
# Run this cell to ensure elec_cleaned_full.csv has proper empty cell formatting
from pathlib import Path
import pandas as pd

def reformat_full_csv_with_standard_nulls(parquet_path, csv_path):
    """
    Regenerate the full CSV from Parquet with industry-standard null handling.
    Writes explicit 'NaN' text for null values (industry standard for CSV).
    """
    print(f"Reading Parquet file: {parquet_path}")
    df = pd.read_parquet(parquet_path)
    
    print(f"Converting to dict and back to DataFrame for consistent formatting...")
    # This approach converts Int64 ‚Üí int64, proper handling of types
    df_csv = pd.DataFrame(df.to_dict('records'))
    
    # Convert date columns to strings if present
    for col in ['date', 'settlement_date']:
        if col in df_csv.columns and df_csv[col].dtype == object:
            df_csv[col] = df_csv[col].astype(str)
    
    print(f"Writing CSV with industry-standard null handling (explicit NaN): {csv_path}")
    # na_rep='NaN' writes explicit NaN text for null values (industry standard)
    df_csv.to_csv(csv_path, index=False, na_rep='NaN')
    
    print(f"‚úÖ Done! CSV file regenerated with standard null handling.")
    print(f"   Rows: {len(df_csv)}, Columns: {len(df_csv.columns)}")
    
    # Show null handling
    null_counts = df_csv.isnull().sum()
    print(f"\n   Null values per column (top 5):")
    print(null_counts[null_counts > 0].head())
    
    return df_csv

# Execute the reformatting
parquet_file = Path('data/interim/elec_cleaned_full.parquet')
csv_file = Path('data/interim/elec_cleaned_full.csv')

if parquet_file.exists():
    df_result = reformat_full_csv_with_standard_nulls(parquet_file, csv_file)
    
    # Verify the result
    print("\nüìä Verification:")
    print(f"Data types:\n{df_result.dtypes.value_counts()}")
    print(f"\nFirst 3 rows with nulls visible:")
    print(df_result.head(3).to_string())
else:
    print(f"‚ùå Parquet file not found: {parquet_file}")
    print("Run cell 3 first to generate the Parquet file.")