In [1]:
from openelectricity import AsyncOEClient
from openelectricity.types import DataMetric, MarketMetric, UnitStatusType
from datetime import datetime, timedelta
import paho.mqtt.client as mqtt
import json
import time
import pandas as pd
import asyncio
import re
import os
import pickle
import os
import json
from datetime import datetime, timedelta
from collections import Counter

# OpenElectricity API
from openelectricity import AsyncOEClient
from openelectricity.types import DataMetric, MarketMetric, UnitStatusType
from datetime import datetime, timedelta

# Async support
import asyncio
import nest_asyncio
nest_asyncio.apply()

# Optional: Suppress warnings
import warnings
warnings.filterwarnings('ignore')


# 1. Data Retrieval

In [2]:
# Define data fetching function
async def fetch_data_in_chunks(client, facility_codes, start_date, end_date, max_days=7, batch_size=10):
    """
    Args:
        client: AsyncOEClient instance
        facility_codes: List of facility codes to fetch data for
        start_date: Start date for data retrieval
        end_date: End date for data retrieval
        max_days: Maximum days per chunk (default 7 to stay within 8-day limit)
        batch_size: Number of facilities to fetch at once (reduced to 10)
    
    Returns:
        all_facility_data: List of facility power and emissions data
        all_market_data: List of market price and demand data
    """
    all_facility_data = []
    all_market_data = []
    market_data_fetched = False
    
    # Split facilities into batches
    total_facilities = len(facility_codes)
    num_batches = (total_facilities + batch_size - 1) // batch_size
    
    print(f"Total facilities: {total_facilities}")
    print(f"Batch size: {batch_size}")
    print(f"Number of batches: {num_batches}")
    print()
    
    for batch_idx in range(num_batches):
        start_idx = batch_idx * batch_size
        end_idx = min((batch_idx + 1) * batch_size, total_facilities)
        facility_batch = facility_codes[start_idx:end_idx]
        
        print(f"Processing batch {batch_idx + 1}/{num_batches} ({len(facility_batch)} facilities)")
        
        current_start = start_date
        
        while current_start < end_date:
            # Calculate end date for current time chunk
            current_end = min(current_start + timedelta(days=max_days), end_date)
            
            print(f"  Fetching data from {current_start.date()} to {current_end.date()}")
            
            try:
                # 1. Fetch facility data (power and emissions)
                facility_response = await client.get_facility_data(
                    network_code="NEM",
                    facility_code=facility_batch,  # Batch of facilities
                    metrics=[DataMetric.POWER, DataMetric.EMISSIONS],
                    interval="5m",
                    date_start=current_start,
                    date_end=current_end
                )
                
                # Process facility data - FIXED attribute access
                for series in facility_response.data:
                    metric_name = series.metric
                    unit = series.unit
                    
                    for result in series.results:
                        # Extract facility code from the result name
                        # The name format may vary, try to extract facility code
                        facility_name = result.name
                        
                        for point in result.data:
                            all_facility_data.append({
                                'timestamp': point.timestamp,
                                'facility_name': facility_name,
                                'metric': metric_name,
                                'value': point.value,
                                'unit': unit
                            })
                
                print(f"    ✓ Fetched {len(facility_response.data)} facility metric series")
                
            except Exception as e:
                print(f"    ! Error fetching facility data: {e}")
                # Continue with next batch instead of breaking completely
            
            # Fetch market data only once (not per facility batch)
            if not market_data_fetched:
                try:
                    # 2. Fetch market data (price and demand)
                    market_response = await client.get_market(
                        network_code="NEM",
                        metrics=[MarketMetric.PRICE, MarketMetric.DEMAND],
                        interval="5m",
                        date_start=current_start,
                        date_end=current_end,
                        primary_grouping="network_region"
                    )
                    
                    # Process market data
                    for series in market_response.data:
                        for result in series.results:
                            region = result.name.split("_")[-1] if "_" in result.name else result.name
                            for point in result.data:
                                all_market_data.append({
                                    'timestamp': point.timestamp,
                                    'region': region,
                                    'metric': series.metric,
                                    'value': point.value,
                                    'unit': series.unit
                                })
                    
                    print(f"    Fetched {len(market_response.data)} market metric series")
                    
                except Exception as e:
                    print(f"    Error fetching market data: {e}")
            
            # Move to next time chunk
            current_start = current_end + timedelta(minutes=5)
            
            # Add small delay to avoid API rate limiting
            await asyncio.sleep(0.5)
        
        # Mark market data as fetched after first batch's time chunks are done
        market_data_fetched = True
        
        # Delay between facility batches
        if batch_idx < num_batches - 1:
            print(f"  Waiting before next batch...")
            await asyncio.sleep(1)
        
        print()
    
    return all_facility_data, all_market_data


In [3]:
# Fetch list of operational facilities in NEM network
async with AsyncOEClient() as client:
    facilities_response = await client.get_facilities(
        network_id=["NEM"],
        status_id=[UnitStatusType.OPERATING]
    )
    
    # Extract facility codes
    facility_codes = [facility.code for facility in facilities_response.data]
    
    print(f"Found {len(facility_codes)} operational facilities")
    print(f"Sample facilities: {facility_codes[:5]}")

[2025-11-06 12:34:14] DEBUG [openelectricity.client.__init__:79] Initialized client with base URL: https://api.openelectricity.org.au/v4/
[2025-11-06 12:34:14] DEBUG [openelectricity.client.__init__:417] Initialized asynchronous client
[2025-11-06 12:34:14] DEBUG [openelectricity.client._ensure_client:422] Creating new async client session
[2025-11-06 12:34:14] DEBUG [openelectricity.client.get_facilities:450] Getting facilities
[2025-11-06 12:34:14] DEBUG [openelectricity.client.get_facilities:461] Request parameters: {'status_id': ['operating'], 'network_id': ['NEM']}
[2025-11-06 12:34:15] DEBUG [openelectricity.client._handle_response:438] Received successful response: 200
Found 420 operational facilities
Sample facilities: ['ADP', 'ALDGASF', 'ANGASTON', 'APPIN', 'ARWF']
[2025-11-06 12:34:15] DEBUG [openelectricity.client.close:600] Closing async client session


In [4]:
# Set time range (at least 1 week in October 2025)
start_date = datetime(2025, 10, 1, 0, 0, 0)
end_date = datetime(2025, 10, 8, 23, 55, 0)  # 8 days of data

print(f"Start time: {start_date}")
print(f"End time: {end_date}")
print(f"Data interval: 5 minutes")
print(f"Time span: {(end_date - start_date).days} days")

Start time: 2025-10-01 00:00:00
End time: 2025-10-08 23:55:00
Data interval: 5 minutes
Time span: 7 days


In [5]:
# Fetch facility and market data with smaller batches
async with AsyncOEClient() as client:
    facility_data, market_data = await fetch_data_in_chunks(
        client, 
        facility_codes,
        start_date, 
        end_date,
        max_days=7,  # Stay within 8-day limit
        batch_size=10  # Reduced batch size to avoid errors
    )

print(f"\nTotal facility data records fetched: {len(facility_data)}")
print(f"Total market data records fetched: {len(market_data)}")

[2025-11-06 12:34:17] DEBUG [openelectricity.client.__init__:79] Initialized client with base URL: https://api.openelectricity.org.au/v4/
[2025-11-06 12:34:17] DEBUG [openelectricity.client.__init__:417] Initialized asynchronous client
[2025-11-06 12:34:17] DEBUG [openelectricity.client._ensure_client:422] Creating new async client session
Total facilities: 420
Batch size: 10
Number of batches: 42

Processing batch 1/42 (10 facilities)
  Fetching data from 2025-10-01 to 2025-10-08
[2025-11-06 12:34:17] DEBUG [openelectricity.client.get_facility_data:534] Getting facility data for NEM/['ADP', 'ALDGASF', 'ANGASTON', 'APPIN', 'ARWF', 'AVLSF', 'DEIBDL', 'BAKING', 'BHWF', 'BALBESS'] (metrics: [<DataMetric.POWER: 'power'>, <DataMetric.EMISSIONS: 'emissions'>], interval: 5m)
[2025-11-06 12:34:17] DEBUG [openelectricity.client.get_facility_data:551] Request parameters: {'facility_code': ['ADP', 'ALDGASF', 'ANGASTON', 'APPIN', 'ARWF', 'AVLSF', 'DEIBDL', 'BAKING', 'BHWF', 'BALBESS'], 'metrics': 

In [2]:
# Fetch NEM facility locations

async def fetch_all_facilities():
    """
    Fetch all operational facilities in the NEM network
    
    Returns:
        List of facility data dictionaries
    """
    
    print("\n Connecting to OpenElectricity API...")
    
    async with AsyncOEClient() as client:
        # Fetch all operational facilities in NEM network
        # Correct parameter format: list[str] for network_id
        facilities_response = await client.get_facilities(
            network_id=["NEM"],  # Pass as list
            status_id=[UnitStatusType.OPERATING]  # Pass as list
        )
        
        print(f" Successfully fetched facility data")
        print(f" Total facilities found: {len(facilities_response.data)}")
        
        # Extract and convert facility data
        facilities = []
        
        for facility in facilities_response.data:
            # Handle location data safely
            location_data = None
            if hasattr(facility, 'location') and facility.location:
                location_data = {
                    'lat': facility.location.lat,
                    'lng': facility.location.lng,
                }
            
            # Convert facility object to dictionary
            facility_dict = {
                'code': facility.code if hasattr(facility, 'code') else None,
                'name': facility.name if hasattr(facility, 'name') else 'Unknown',
                'network_id': facility.network_id if hasattr(facility, 'network_id') else None,
                'network_region': facility.network_region if hasattr(facility, 'network_region') else None,
                'description': facility.description if hasattr(facility, 'description') else '',
                'npi_identifier': facility.npi_identifier if hasattr(facility, 'npi_identifier') else None,
                'location': location_data,
                'fueltech': facility.fueltech if hasattr(facility, 'fueltech') else None,
                'status': facility.status if hasattr(facility, 'status') else None,
                'capacity_registered': facility.capacity_registered if hasattr(facility, 'capacity_registered') else 0,
                'unit_ids': facility.unit_ids if hasattr(facility, 'unit_ids') else [],
                'units_number': facility.units_number if hasattr(facility, 'units_number') else 0,
                'created_at': str(facility.created_at) if hasattr(facility, 'created_at') and facility.created_at else None,
                'updated_at': str(facility.updated_at) if hasattr(facility, 'updated_at') and facility.updated_at else None,
            }
            facilities.append(facility_dict)
        
        return facilities

# Execute async function
print("\n Fetching facility data...")
facility_data = await fetch_all_facilities()

print(f"\n Facility data loaded!")
print(f" Total NEM facilities: {len(facility_data)}")

# Display sample facility data structure
if facility_data:
    print("\n Sample facility record structure:")
    sample = facility_data[0]
    print(f"  Keys available: {list(sample.keys())}")
    print(f"\n  Sample facility:")
    print(f"    Code: {sample.get('code', 'N/A')}")
    print(f"    Name: {sample.get('name', 'N/A')}")
    print(f"    Region: {sample.get('network_region', 'N/A')}")
    print(f"    Fuel Type: {sample.get('fueltech', 'N/A')}")
    print(f"    Has Location: {sample['location'] is not None}")
    if sample['location']:
        print(f"    Latitude: {sample['location']['lat']}")
        print(f"    Longitude: {sample['location']['lng']}")

# Show sample facilities with locations
print("\n Sample facilities with locations:")
print("-" * 110)
print(f"{'Facility Name':<50} | {'Code':<12} | {'Region':<5} | {'Lat':>9} | {'Lng':>10}")
print("-" * 110)

count = 0
for facility in facility_data:
    if facility['location'] and facility['location']['lat']:
        print(f"{facility['name'][:48]:<50} | {facility['code']:<12} | "
              f"{facility['network_region'] or 'N/A':<5} | "
              f"{facility['location']['lat']:>9.4f} | {facility['location']['lng']:>10.4f}")
        count += 1
        if count >= 15:
            break

# Quick statistics
total_with_location = sum(1 for f in facility_data if f['location'] and f['location']['lat'])
total_without_location = len(facility_data) - total_with_location

print(f"\n Location Statistics:")
print(f" Facilities with valid locations: {total_with_location} ({total_with_location/len(facility_data)*100:.1f}%)")
print(f" Facilities without locations: {total_without_location} ({total_without_location/len(facility_data)*100:.1f}%)")

# Show distribution by region
from collections import Counter
region_counts = Counter([f['network_region'] for f in facility_data if f['network_region']])
print(f"\n Facilities by Region:")
for region, count in sorted(region_counts.items()):
    print(f"  {region}: {count}")



 Fetching facility data...

 Connecting to OpenElectricity API...
[2025-11-07 10:56:28] DEBUG [openelectricity.client.__init__:79] Initialized client with base URL: https://api.openelectricity.org.au/v4/
[2025-11-07 10:56:28] DEBUG [openelectricity.client.__init__:417] Initialized asynchronous client
[2025-11-07 10:56:28] DEBUG [openelectricity.client._ensure_client:422] Creating new async client session
[2025-11-07 10:56:29] DEBUG [openelectricity.client.get_facilities:450] Getting facilities
[2025-11-07 10:56:29] DEBUG [openelectricity.client.get_facilities:461] Request parameters: {'status_id': ['operating'], 'network_id': ['NEM']}
[2025-11-07 10:56:30] DEBUG [openelectricity.client._handle_response:438] Received successful response: 200
 Successfully fetched facility data
 Total facilities found: 420
[2025-11-07 10:56:30] DEBUG [openelectricity.client.close:600] Closing async client session

 Facility data loaded!
 Total NEM facilities: 420

 Sample facility record structure:
  Ke

In [3]:
# Extract location information from facility data 

facility_locations = {}
facilities_without_location = []
invalid_coordinates = []

print(f"\n Processing {len(facility_data)} facility records...")

for record in facility_data:
    try:
        # Extract facility code (required)
        facility_code = record.get('code')
        
        if not facility_code:
            continue
        
        # Extract location object
        location = record.get('location')
        
        if not location or not isinstance(location, dict):
            facilities_without_location.append({
                'code': facility_code,
                'name': record.get('name', 'Unknown'),
                'reason': 'No location object'
            })
            continue
        
        # Extract lat/lng from FacilityLocation object
        lat = location.get('lat')
        lng = location.get('lng')
        
        # Skip if no valid coordinates
        if lat is None or lng is None:
            facilities_without_location.append({
                'code': facility_code,
                'name': record.get('name', 'Unknown'),
                'reason': 'Missing lat/lng in location object'
            })
            continue
        
        # Convert to float and validate
        try:
            lat = float(lat)
            lng = float(lng)
            
            # Basic validation for Australian coordinates
            # Australia: lat ~ -10 to -45, lng ~ 110 to 155
            if not (-50 < lat < 0 and 100 < lng < 160):
                invalid_coordinates.append({
                    'code': facility_code,
                    'name': record.get('name', 'Unknown'),
                    'lat': lat,
                    'lng': lng,
                    'reason': 'Coordinates outside Australia bounds'
                })
                continue
                
        except (ValueError, TypeError) as e:
            facilities_without_location.append({
                'code': facility_code,
                'name': record.get('name', 'Unknown'),
                'reason': f'Invalid coordinate format: {e}'
            })
            continue
        
        # Extract all available facility information (with safe defaults)
        facility_locations[facility_code] = {
            'code': facility_code,
            'name': record.get('name') or 'Unknown',
            'lat': lat,
            'lng': lng,
            'network_id': record.get('network_id') or 'Unknown',
            'network_region': record.get('network_region') or 'Unknown',
            'description': record.get('description') or '',
            'npi_identifier': record.get('npi_identifier') or '',
            'fueltech': record.get('fueltech') or 'Unknown',
            'status': record.get('status') or 'Unknown',  # Ensure not None
            'capacity_registered': float(record.get('capacity_registered') or 0),
            'unit_ids': record.get('unit_ids') or [],
            'units_number': record.get('units_number') or 0,
            'created_at': record.get('created_at') or '',
            'updated_at': record.get('updated_at') or '',
        }
        
    except Exception as e:
        print(f" Error processing facility {record.get('code', 'unknown')}: {e}")
        continue

# Summary statistics
total_processed = len(facility_data)
total_with_locations = len(facility_locations)
total_without_locations = len(facilities_without_location)
total_invalid = len(invalid_coordinates)

print(f"\n" + "=" * 60)
print("EXTRACTION SUMMARY")
print("=" * 60)
print(f" Successfully extracted locations: {total_with_locations}")
print(f" Facilities without location data: {total_without_locations}")
print(f" Invalid coordinates: {total_invalid}")
print(f" Success rate: {(total_with_locations/total_processed*100):.1f}%")

# Show facilities without locations (if any)
if facilities_without_location:
    print(f"\n Facilities without location data (showing first 15):")
    print("-" * 90)
    print(f"{'Code':<15} | {'Facility Name':<50} | {'Reason':<20}")
    print("-" * 90)
    for i, fac in enumerate(facilities_without_location[:15], 1):
        reason = fac.get('reason', 'Unknown')
        print(f"{fac['code']:<15} | {fac['name'][:48]:<50} | {reason:<20}")
    
    if len(facilities_without_location) > 15:
        print(f"... and {len(facilities_without_location) - 15} more")

# Show invalid coordinates (if any)
if invalid_coordinates:
    print(f"\n Facilities with invalid coordinates:")
    print("-" * 100)
    print(f"{'Code':<15} | {'Facility Name':<40} | {'Lat':>10} | {'Lng':>10} | {'Reason':<20}")
    print("-" * 100)
    for fac in invalid_coordinates:
        print(f"{fac['code']:<15} | {fac['name'][:38]:<40} | "
              f"{fac['lat']:>10.4f} | {fac['lng']:>10.4f} | {fac.get('reason', 'N/A'):<20}")

# Display sample of successfully extracted locations
if facility_locations:
    print(f"\n Sample of successfully extracted locations (first 20):")
    print("-" * 120)
    print(f"{'Facility Name':<45} | {'Code':<12} | {'Region':<6} | {'Fuel Type':<15} | {'Lat':>9} | {'Lng':>10}")
    print("-" * 120)
    
    for i, (code, info) in enumerate(list(facility_locations.items())[:20], 1):
        name = (info['name'] or 'Unknown')[:43]
        fueltech = (info['fueltech'] or 'Unknown')[:13]
        region = (info['network_region'] or 'N/A')[:4]
        
        print(f"{name:<45} | {info['code']:<12} | {region:<6} | "
              f"{fueltech:<15} | {info['lat']:>9.4f} | {info['lng']:>10.4f}")
    
    if len(facility_locations) > 20:
        print(f"... and {len(facility_locations) - 20} more facilities")

# Summary message
print(f"\n Data is stored in the 'facility_locations' dictionary")


 Processing 420 facility records...

EXTRACTION SUMMARY
 Successfully extracted locations: 420
 Facilities without location data: 0
 Invalid coordinates: 0
 Success rate: 100.0%

 Sample of successfully extracted locations (first 20):
------------------------------------------------------------------------------------------------------------------------
Facility Name                                 | Code         | Region | Fuel Type       |       Lat |        Lng
------------------------------------------------------------------------------------------------------------------------
Adelaide Desalination                         | ADP          | SA1    | Unknown         |  -35.0969 |   138.4841
Aldoga                                        | ALDGASF      | QLD1   | Unknown         |  -23.8395 |   151.0849
Angaston                                      | ANGASTON     | SA1    | Unknown         |  -34.5039 |   139.0243
Appin                                         | APPIN        | NSW1   

In [4]:
# Save 'facility_locations' dictionary to pkl file
with open('facility_locations.pkl', 'wb') as f:
    pickle.dump(facility_locations, f)

print(f"✓ Saved {len(facility_locations)} facilities to facility_locations.pkl")

✓ Saved 420 facilities to facility_locations.pkl


# 2 Data Integration and Materialization/Caching

In [107]:
# Convert to DataFrames
df_facility = pd.DataFrame(facility_data)
df_market = pd.DataFrame(market_data)

## 2.1 Process Facility Data

In [108]:
# INSPECT FACILITY DATA STRUCTURE (DETAILED)

# Basic info
print("Facility data basic info:")
print(f"  Shape: {df_facility.shape}")
print(f"  Columns: {list(df_facility.columns)}")
print()

# Data types
print("Data types:")
print(df_facility.dtypes)
print()

# Sample data
print("Sample facility data (first 30 rows):")
print(df_facility.head(30))
print()

# Check unique values for each column
print("="*80)
print("UNIQUE VALUES ANALYSIS")
print("="*80)
print()

print(f"Unique facility_name: {df_facility['facility_name'].nunique()}")
print(f"  Sample facility_names (first 20):")
for i, name in enumerate(sorted(df_facility['facility_name'].unique())[:20], 1):
    print(f"    {i}. {name}")
print()

print(f"Unique metric: {df_facility['metric'].nunique()}")
print(f"  All metrics: {sorted(df_facility['metric'].unique())}")
print()

print(f"Unique timestamps: {df_facility['timestamp'].nunique()}")
print(f"  Timestamp range: {df_facility['timestamp'].min()} to {df_facility['timestamp'].max()}")
print()

# Value statistics by metric
print("="*80)
print("VALUE STATISTICS BY METRIC")
print("="*80)
print()

for metric in sorted(df_facility['metric'].unique()):
    metric_data = df_facility[df_facility['metric'] == metric]
    print(f"\nMetric: {metric}")
    print(f"  Records: {len(metric_data):,}")
    print(f"  Value range: {metric_data['value'].min():.3f} to {metric_data['value'].max():.3f}")
    print(f"  Mean: {metric_data['value'].mean():.3f}")
    
    zero_count = (metric_data['value'] == 0).sum()
    non_zero_count = (metric_data['value'] != 0).sum()
    null_count = metric_data['value'].isna().sum()
    
    print(f"  Zero values: {zero_count:,} ({zero_count/len(metric_data)*100:.2f}%)")
    print(f"  Non-zero values: {non_zero_count:,} ({non_zero_count/len(metric_data)*100:.2f}%)")
    print(f"  Null values: {null_count:,} ({null_count/len(metric_data)*100:.2f}%)")

print()

Facility data basic info:
  Shape: (2240118, 5)
  Columns: ['timestamp', 'facility_name', 'metric', 'value', 'unit']

Data types:
timestamp        datetime64[ns, +10:00]
facility_name                    object
metric                           object
value                           float64
unit                             object
dtype: object

Sample facility data (first 30 rows):
                   timestamp facility_name metric  value unit
0  2025-10-01 00:00:00+10:00  power_ADPBA1  power -0.004   MW
1  2025-10-01 00:05:00+10:00  power_ADPBA1  power -0.046   MW
2  2025-10-01 00:10:00+10:00  power_ADPBA1  power  0.000   MW
3  2025-10-01 00:15:00+10:00  power_ADPBA1  power  0.003   MW
4  2025-10-01 00:20:00+10:00  power_ADPBA1  power -0.018   MW
5  2025-10-01 00:25:00+10:00  power_ADPBA1  power  0.004   MW
6  2025-10-01 00:30:00+10:00  power_ADPBA1  power -0.001   MW
7  2025-10-01 00:35:00+10:00  power_ADPBA1  power -0.003   MW
8  2025-10-01 00:40:00+10:00  power_ADPBA1  power  0.076   

In [109]:
# Remove 'power_' prefix
df_facility['facility_code'] = df_facility['facility_name'].str.replace(
    r'^power_', '', regex=True
)

print("After removing 'power_' prefix:")
print(f"  Unique facility codes: {df_facility['facility_code'].nunique()}")
print(f"  Sample: {df_facility['facility_code'].unique()[:10]}")
print()

After removing 'power_' prefix:
  Unique facility codes: 1066
  Sample: ['ADPBA1' 'ADPBA1G' 'ADPPV1' 'ALDGASF1' 'ANGAST1' 'ARWF1' 'AVLSF1' 'BALB1'
 'BALBG1' 'BALDHWF1']



In [110]:
# Also remove 'emissions_' prefix 
df_facility['facility_code'] = df_facility['facility_code'].str.replace(
    r'^emissions_', '', regex=True
)

print("After removing 'emissions_' prefix:")
print(f"  Unique facility codes: {df_facility['facility_code'].nunique()}")
print(f"  Sample: {df_facility['facility_code'].unique()[:10]}")
print()

After removing 'emissions_' prefix:
  Unique facility codes: 533
  Sample: ['ADPBA1' 'ADPBA1G' 'ADPPV1' 'ALDGASF1' 'ANGAST1' 'ARWF1' 'AVLSF1' 'BALB1'
 'BALBG1' 'BALDHWF1']



In [111]:
# CONVERT METRICS TO COLUMNS

# Use pivot_table to convert metric rows to columns
df_facility_pivot = df_facility.pivot_table(
    index=['timestamp', 'facility_code'],
    columns='metric',
    values='value',
    aggfunc='first'
).reset_index()

# Remove the 'metric' name from columns
df_facility_pivot.columns.name = None

print(f"✓ Pivoted facility data shape: {df_facility_pivot.shape}")
print(f"✓ Columns after pivot: {list(df_facility_pivot.columns)}")
print()

# Display sample pivoted data
print("Sample pivoted facility data (first 10 rows):")
print(df_facility_pivot.head(10))
print()


✓ Pivoted facility data shape: (1120059, 4)
✓ Columns after pivot: ['timestamp', 'facility_code', 'emissions', 'power']

Sample pivoted facility data (first 10 rows):
                  timestamp facility_code  emissions   power
0 2025-10-01 00:00:00+10:00        ADPBA1        0.0  -0.004
1 2025-10-01 00:00:00+10:00       ADPBA1L        0.0   0.004
2 2025-10-01 00:00:00+10:00        ADPPV1        0.0   0.000
3 2025-10-01 00:00:00+10:00        AGLHAL        0.0   0.000
4 2025-10-01 00:00:00+10:00        AGLSOM        0.0   0.000
5 2025-10-01 00:00:00+10:00      ALDGASF1        0.0   0.000
6 2025-10-01 00:00:00+10:00       ANGAST1        0.0   0.000
7 2025-10-01 00:00:00+10:00         ARWF1        0.0  35.400
8 2025-10-01 00:00:00+10:00        AVLSF1        0.0   0.000
9 2025-10-01 00:00:00+10:00         BALB1        0.0   0.000



In [112]:
# CHECK FOR NULL VALUES AFTER PIVOT4

null_counts = df_facility_pivot.isnull().sum()

for col, count in null_counts.items():
    if count > 0:
        print("Null value counts by column:")
        pct = (count / len(df_facility_pivot)) * 100
        print(f"  {col}: {count:,} ({pct:.2f}%)")

if null_counts.sum() == 0:
    print(" No null values found")
print()

 No null values found



In [113]:
# ADD TIME COMPONENTS TO FACILITY DATA

# Ensure timestamp is datetime
df_facility_pivot['timestamp'] = pd.to_datetime(df_facility_pivot['timestamp'])

# Add date and hour columns
df_facility_pivot['date'] = df_facility_pivot['timestamp'].dt.date
df_facility_pivot['hour'] = df_facility_pivot['timestamp'].dt.hour

print("Added 'date' and 'hour' columns")
print(f" Timestamp range: {df_facility_pivot['timestamp'].min()} to {df_facility_pivot['timestamp'].max()}")
print(f" Date range: {df_facility_pivot['date'].min()} to {df_facility_pivot['date'].max()}")
print(f" Hour range: {df_facility_pivot['hour'].min()} to {df_facility_pivot['hour'].max()}")
print()

# Display updated columns
print(f"Updated columns: {list(df_facility_pivot.columns)}")
print()

Added 'date' and 'hour' columns
 Timestamp range: 2025-10-01 00:00:00+10:00 to 2025-10-08 23:50:00+10:00
 Date range: 2025-10-01 to 2025-10-08
 Hour range: 0 to 23

Updated columns: ['timestamp', 'facility_code', 'emissions', 'power', 'date', 'hour']



In [114]:
# REORDER COLUMNS FOR FACILITY DATA

# Check which metric columns exist
metric_cols = []
if 'emissions' in df_facility_pivot.columns:
    metric_cols.append('emissions')
if 'power' in df_facility_pivot.columns:
    metric_cols.append('power')


# Define desired column order
base_cols = ['timestamp', 'date', 'hour', 'facility_code']
final_cols = base_cols + metric_cols

# Reorder columns
df_facility_pivot = df_facility_pivot[final_cols]


In [122]:
# Final facility data shape
print(f"Shape: {df_facility_pivot.shape}")
print(f"Columns: {list(df_facility_pivot.columns)}")

# Display final sample
print("Final facility data sample (first 10 rows):")
print(df_facility_pivot.head(10))
print()

Shape: (1120059, 6)
Columns: ['timestamp', 'date', 'hour', 'facility_code', 'emissions', 'power']
Final facility data sample (first 10 rows):
                  timestamp        date  hour facility_code  emissions   power
0 2025-10-01 00:00:00+10:00  2025-10-01     0        ADPBA1        0.0  -0.004
1 2025-10-01 00:00:00+10:00  2025-10-01     0       ADPBA1L        0.0   0.004
2 2025-10-01 00:00:00+10:00  2025-10-01     0        ADPPV1        0.0   0.000
3 2025-10-01 00:00:00+10:00  2025-10-01     0        AGLHAL        0.0   0.000
4 2025-10-01 00:00:00+10:00  2025-10-01     0        AGLSOM        0.0   0.000
5 2025-10-01 00:00:00+10:00  2025-10-01     0      ALDGASF1        0.0   0.000
6 2025-10-01 00:00:00+10:00  2025-10-01     0       ANGAST1        0.0   0.000
7 2025-10-01 00:00:00+10:00  2025-10-01     0         ARWF1        0.0  35.400
8 2025-10-01 00:00:00+10:00  2025-10-01     0        AVLSF1        0.0   0.000
9 2025-10-01 00:00:00+10:00  2025-10-01     0         BALB1        0

## 2.2 Process Market Data

In [116]:
# INSPECT MARKET DATA STRUCTURE

# Basic info
print("Market data basic info:")
print(f"  Shape: {df_market.shape}")
print(f"  Columns: {list(df_market.columns)}")
print()

# Data types
print("Data types:")
print(df_market.dtypes)
print()

# Sample data
print("Sample market data (first 20 rows):")
print(df_market.head(20))
print()

# Check unique values
print("Unique values:")
print(f"  Unique regions: {df_market['region'].nunique()}")
print(f"    Regions: {sorted(df_market['region'].unique())}")
print()

if 'market_name' in df_market.columns:
    print(f"  Unique market_name: {df_market['market_name'].nunique()}")
    print(f"    Market names: {sorted(df_market['market_name'].unique())}")
    print()

# Check timestamp range
print(f"  Timestamp range: {df_market['timestamp'].min()} to {df_market['timestamp'].max()}")
print(f"  Unique timestamps: {df_market['timestamp'].nunique()}")
print()

# Value statistics
print("Value statistics:")
print(df_market['value'].describe())
print()

# Check for zero/non-zero values
zero_values = (df_market['value'] == 0).sum()
non_zero_values = (df_market['value'] != 0).sum()
null_values = df_market['value'].isna().sum()

print(f"  Zero values: {zero_values:,} ({zero_values/len(df_market)*100:.2f}%)")
print(f"  Non-zero values: {non_zero_values:,} ({non_zero_values/len(df_market)*100:.2f}%)")
print(f"  Null values: {null_values:,} ({null_values/len(df_market)*100:.2f}%)")
print()

# Sample non-zero values
print("Sample non-zero market data:")
print(df_market[df_market['value'] != 0].head(10))
print()

Market data basic info:
  Shape: (23020, 5)
  Columns: ['timestamp', 'region', 'metric', 'value', 'unit']

Data types:
timestamp    datetime64[ns, +10:00]
region                       object
metric                       object
value                       float64
unit                         object
dtype: object

Sample market data (first 20 rows):
                   timestamp region metric  value   unit
0  2025-10-01 00:00:00+10:00   NSW1  price  56.98  $/MWh
1  2025-10-01 00:05:00+10:00   NSW1  price  80.01  $/MWh
2  2025-10-01 00:10:00+10:00   NSW1  price  58.40  $/MWh
3  2025-10-01 00:15:00+10:00   NSW1  price  80.01  $/MWh
4  2025-10-01 00:20:00+10:00   NSW1  price  65.00  $/MWh
5  2025-10-01 00:25:00+10:00   NSW1  price  63.83  $/MWh
6  2025-10-01 00:30:00+10:00   NSW1  price  61.05  $/MWh
7  2025-10-01 00:35:00+10:00   NSW1  price  57.13  $/MWh
8  2025-10-01 00:40:00+10:00   NSW1  price  61.05  $/MWh
9  2025-10-01 00:45:00+10:00   NSW1  price  57.01  $/MWh
10 2025-10-01 00:50:00+

In [117]:
# Pivot market data
df_market_pivot = df_market.pivot_table(
    index=['timestamp', 'region'],
    columns='metric',
    values='value',
    aggfunc='first'
).reset_index()

print(f"Pivoted market data shape: {df_market_pivot.shape}")
print(f"Columns: {list(df_market_pivot.columns)}")
print()

Pivoted market data shape: (11490, 4)
Columns: ['timestamp', 'region', 'demand', 'price']



In [118]:
# Add hour and date columns
df_facility_pivot['timestamp'] = pd.to_datetime(df_facility_pivot['timestamp'])
df_facility_pivot['hour'] = df_facility_pivot['timestamp'].dt.hour
df_facility_pivot['date'] = df_facility_pivot['timestamp'].dt.date

df_market_pivot['timestamp'] = pd.to_datetime(df_market_pivot['timestamp'])
df_market_pivot['hour'] = df_market_pivot['timestamp'].dt.hour
df_market_pivot['date'] = df_market_pivot['timestamp'].dt.date

In [119]:
# Final column ordering
# Reorder facility columns for better readability
facility_cols = ['timestamp', 'date', 'hour', 'facility_name', 'facility_code', 
                 'region', 'emissions', 'power']
# Only select columns that exist
facility_cols_exist = [col for col in facility_cols if col in df_facility_pivot.columns]
# Add any remaining columns
remaining_cols = [col for col in df_facility_pivot.columns if col not in facility_cols_exist]
df_facility_pivot = df_facility_pivot[facility_cols_exist + remaining_cols]

# Reorder market columns
market_cols = ['timestamp', 'date', 'hour', 'region', 'demand', 'price']
market_cols_exist = [col for col in market_cols if col in df_market_pivot.columns]
remaining_market_cols = [col for col in df_market_pivot.columns if col not in market_cols_exist]
df_market_pivot = df_market_pivot[market_cols_exist + remaining_market_cols]

In [124]:
# Final Market data shape
print(f"Shape: {df_market_pivot.shape}")
print(f"Columns: {list(df_market_pivot.columns)}")

# Display final sample
print("Final market data sample (first 10 rows):")
print(df_market_pivot.head(10))
print()

Shape: (11490, 6)
Columns: ['timestamp', 'date', 'hour', 'region', 'demand', 'price']
Final market data sample (first 10 rows):
metric                 timestamp        date  hour region   demand  price
0      2025-10-01 00:00:00+10:00  2025-10-01     0   NSW1  7105.57  56.98
1      2025-10-01 00:00:00+10:00  2025-10-01     0   QLD1  5989.24  54.82
2      2025-10-01 00:00:00+10:00  2025-10-01     0    SA1  1564.92   8.11
3      2025-10-01 00:00:00+10:00  2025-10-01     0   TAS1   898.71   0.12
4      2025-10-01 00:00:00+10:00  2025-10-01     0   VIC1  4893.49   8.95
5      2025-10-01 00:05:00+10:00  2025-10-01     0   NSW1  7170.68  80.01
6      2025-10-01 00:05:00+10:00  2025-10-01     0   QLD1  5920.40  67.30
7      2025-10-01 00:05:00+10:00  2025-10-01     0    SA1  1565.38   0.01
8      2025-10-01 00:05:00+10:00  2025-10-01     0   TAS1   897.18   0.20
9      2025-10-01 00:05:00+10:00  2025-10-01     0   VIC1  4889.73   0.01



## 2.3 Save Facility and Market data to CSV files

In [121]:
# Create output directory
output_dir = 'nem_data_output'
os.makedirs(output_dir, exist_ok=True)

# Save facility data
facility_file = os.path.join(output_dir, 'nem_facility_data.csv')
df_facility_pivot.to_csv(facility_file, index=False)
file_size_mb = os.path.getsize(facility_file) / (1024 * 1024)
print(f"✓ Saved: {facility_file}")
print(f"  Rows: {len(df_facility_pivot):,}")
print(f"  Columns: {len(df_facility_pivot.columns)}")
print(f"  Size: {file_size_mb:.2f} MB")
print()

# Save market data
market_file = os.path.join(output_dir, 'nem_market_data.csv')
df_market_pivot.to_csv(market_file, index=False)
file_size_mb = os.path.getsize(market_file) / (1024 * 1024)
print(f"✓ Saved: {market_file}")
print(f"  Rows: {len(df_market_pivot):,}")
print(f"  Columns: {len(df_market_pivot.columns)}")
print(f"  Size: {file_size_mb:.2f} MB")
print()


✓ Saved: nem_data_output\nem_facility_data.csv
  Rows: 1,120,059
  Columns: 6
  Size: 62.18 MB

✓ Saved: nem_data_output\nem_market_data.csv
  Rows: 11,490
  Columns: 6
  Size: 0.65 MB



# 3. Data Publishing via MQTT

In [4]:
# MQTT Configuration

# UNIQUE TOPIC
MQTT_TOPIC = "szha0285/ass2"

# Broker Selection
USE_TEST_BROKER = False  # Set to False for campus broker, set to True for test broker

if USE_TEST_BROKER:
    BROKER_HOST = "test.mosquitto.org"
    BROKER_PORT = 1883
else:
    BROKER_HOST = "172.17.34.107"
    BROKER_PORT = 1883

print(f"MQTT Broker: {BROKER_HOST}:{BROKER_PORT}")
print(f"MQTT Topic: {MQTT_TOPIC}")


MQTT Broker: 172.17.34.107:1883
MQTT Topic: szha0285/ass2


In [7]:
# MQTT Setup

# Define what happens upon connection to the server
def on_connect(client, userdata, connect_flags, reason_code, properties):
    if reason_code == 0:
        print(f"✓ Connected successfully (result code: {reason_code})")
        client.subscribe(MQTT_TOPIC)
        print(f"✓ Subscribed to topic: {MQTT_TOPIC}")
    else:
        print(f"✗ Connection failed with result code: {reason_code}")

# Define what happens upon receiving a message from the server
def on_message(client, userdata, msg):
    print(f"Received message on topic {msg.topic}: {msg.payload.decode()}")

# Define what happens on disconnect
def on_disconnect(client, userdata, disconnect_flags, reason_code, properties):
    if reason_code != 0:
        print(f"✗ Unexpected disconnection (code: {reason_code})")

# Create MQTT client
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

# Connect to the MQTT server with timeout handling
print("\nConnecting to MQTT broker...")
print(f"Attempting connection to {BROKER_HOST}:{BROKER_PORT}...")
try:
    # Set a socket timeout to detect connection issues faster
    client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
    print("✓ Connection initiated")
    
    # Start the network loop in background
    client.loop_start()
    
    # Wait for connection to complete
    print("Waiting for connection to establish...")
    time.sleep(3)
    
    print("✓ MQTT client ready")
    
except Exception as e:
    print(f"✗ Failed to connect: {e}")
    print("\n TROUBLESHOOTING:")
    print("  1. Check if you're connected to the university network/VPN")
    print("  2. Verify the broker IP address is correct")
    print("  3. If using a public test broker (test.mosquitto.org):")
    raise



Connecting to MQTT broker...
Attempting connection to 172.17.34.107:1883...
✓ Connection initiated
Waiting for connection to establish...
✓ Connected successfully (result code: Success)
✓ Subscribed to topic: szha0285/ass2
✓ MQTT client ready
Received message on topic szha0285/ass2: {"event_timestamp": "2025-10-01T00:00:00+10:00", "facility_code": "ADPBA1", "power_mw": -0.004, "emissions_tco2e": 0.0, "hour": 0}
Received message on topic szha0285/ass2: {"event_timestamp": "2025-10-01T00:00:00+10:00", "facility_code": "QBYNB1", "power_mw": -0.1765, "emissions_tco2e": 0.0, "hour": 0}
Received message on topic szha0285/ass2: {"event_timestamp": "2025-10-01T00:00:00+10:00", "facility_code": "PUMP2", "power_mw": 0.0, "emissions_tco2e": 0.0, "hour": 0}
Received message on topic szha0285/ass2: {"event_timestamp": "2025-10-01T00:00:00+10:00", "facility_code": "PUMP1", "power_mw": 0.0, "emissions_tco2e": 0.0, "hour": 0}
Received message on topic szha0285/ass2: {"event_timestamp": "2025-10-01T00

In [8]:
# Load and Prepare Data

# Load facility and market data
df_facility = pd.read_csv('nem_data_output/nem_facility_data.csv')
df_market = pd.read_csv('nem_data_output/nem_market_data.csv')

# Convert timestamp to datetime for proper sorting
df_facility['timestamp'] = pd.to_datetime(df_facility['timestamp'])
df_market['timestamp'] = pd.to_datetime(df_market['timestamp'])

print(f"\n Loaded facility data: {len(df_facility):,} records")
print(f" Loaded market data: {len(df_market):,} records")



 Loaded facility data: 1,120,059 records
 Loaded market data: 11,490 records


In [9]:
# Sort by Event Time (Chronological Order)

# Sort by timestamp to ensure chronological event order
df_filtered = df_facility.sort_values('timestamp').reset_index(drop=True)

print("✓ Data sorted by event timestamp")
print(f"  Event time range: {df_filtered['timestamp'].min()} to {df_filtered['timestamp'].max()}")
print(f"  Time span: {(df_filtered['timestamp'].max() - df_filtered['timestamp'].min()).total_seconds() / 3600:.1f} hours")
print(f"  Unique facilities: {df_filtered['facility_code'].nunique()}")
print()


✓ Data sorted by event timestamp
  Event time range: 2025-10-01 00:00:00+10:00 to 2025-10-08 23:50:00+10:00
  Time span: 191.8 hours
  Unique facilities: 533



In [10]:
# Sample Message Display

print("First 5 records to be published:")
sample_df = df_filtered[['timestamp', 'facility_code', 'power', 'emissions']].head(5)
print(sample_df.to_string(index=False))
print()

print("JSON message format (example):")
sample_row = df_filtered.iloc[0]
sample_message = {
    "event_timestamp": sample_row['timestamp'].isoformat(),
    "facility_code": sample_row['facility_code'],
    "power_mw": float(sample_row['power']),
    "emissions_tco2e": float(sample_row['emissions']),
    "hour": int(sample_row['hour'])
}
print(json.dumps(sample_message, indent=2))
print()


First 5 records to be published:
                timestamp facility_code   power  emissions
2025-10-01 00:00:00+10:00        ADPBA1 -0.0040        0.0
2025-10-01 00:00:00+10:00        QBYNB1 -0.1765        0.0
2025-10-01 00:00:00+10:00         PUMP2  0.0000        0.0
2025-10-01 00:00:00+10:00         PUMP1  0.0000        0.0
2025-10-01 00:00:00+10:00       PTSTAN1  0.0000        0.0

JSON message format (example):
{
  "event_timestamp": "2025-10-01T00:00:00+10:00",
  "facility_code": "ADPBA1",
  "power_mw": -0.004,
  "emissions_tco2e": 0.0,
  "hour": 0
}



In [11]:
# Publishing Function

def publish_facility_stream(df, topic, delay=0.1, restart_delay=60):
    """
    Publish facility power and emissions data as a continuous stream.
    Only publishes NEW records (records where power or emissions changed from last value).
    
    Args:
        df: DataFrame with facility data sorted by event timestamp
        topic: MQTT topic to publish to
        delay: Delay between messages in seconds (exactly 0.1s)
        restart_delay: Delay before restarting replay (60s)
    """
    
    total_records = len(df)
    cumulative_published = 0
    cumulative_skipped = 0
    replay_count = 0
    
    print("\n" + "=" * 70)
    print("MQTT STREAM PUBLISHING STARTED")
    print("=" * 70)
    print(f"Total records per replay: {total_records:,}")
    print(f"Message delay: {delay}s (exactly 0.1 second)")
    print(f"Restart delay: {restart_delay}s (when data exhausted)")
    print(f"Publishing strategy: Only NEW records (changed values)")
    print(f"MQTT topic: {topic}")
    print(f"MQTT broker: {BROKER_HOST}:{BROKER_PORT}")
    print(f"Event time range: {df['timestamp'].min()} to {df['timestamp'].max()}")
    print("=" * 70 + "\n")
    
    print("Stream publishing in progress...")
    print("(Press Ctrl+C to stop)\n")
    
    try:
        while True:  # Infinite loop for continuous replay
            replay_count += 1
            replay_start_time = time.time()
            replay_published = 0
            replay_skipped = 0
            
            # Track last values per facility to detect changes
            last_values = {}  # facility_code: (power, emissions)
            
            print("_" * 70)
            print(f"REPLAY #{replay_count} - Started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
            print("_" * 70 + "\n")
            
            # Publish each record in chronological order
            for idx, row in df.iterrows():
                # Get current values
                current_power = float(row['power'])
                current_emissions = float(row['emissions'])
                facility_code = row['facility_code']
                
                # Check if this is a NEW record (changed from last value)
                should_publish = True
                
                if facility_code in last_values:
                    last_power, last_emissions = last_values[facility_code]
                    # Skip if BOTH power AND emissions are unchanged
                    if last_power == current_power and last_emissions == current_emissions:
                        should_publish = False
                        replay_skipped += 1
                        cumulative_skipped += 1
                
                # Update last values for this facility
                last_values[facility_code] = (current_power, current_emissions)
                
                if not should_publish:
                    continue
                
                # Create message payload with ALL necessary information
                message = {
                    "event_timestamp": row['timestamp'].isoformat(),
                    "facility_code": row['facility_code'],
                    "power_mw": current_power,
                    "emissions_tco2e": current_emissions,
                    "hour": int(row['hour'])
                }
                
                # Convert to JSON
                message_json = json.dumps(message)
                
                # Publish to MQTT broker
                result = client.publish(topic, message_json, qos=0)
                
                # Check if publish was successful
                if result.rc != mqtt.MQTT_ERR_SUCCESS:
                    print(f"! Warning: Publish failed with code {result.rc}")
                
                replay_published += 1
                cumulative_published += 1
                
                # Progress reporting every 500 messages
                if replay_published % 500 == 0:
                    progress_pct = ((idx + 1) / total_records) * 100
                    elapsed = time.time() - replay_start_time
                    rate = replay_published / elapsed if elapsed > 0 else 0
                    
                    print(f"{datetime.now().strftime('%H:%M:%S')} | "
                          f"Progress: {idx + 1:,}/{total_records:,} ({progress_pct:.1f}%) | "
                          f"Published: {replay_published:,} | "
                          f"Skipped: {replay_skipped:,} | "
                          f"Rate: {rate:.1f} msg/s")
                
                # EXACTLY 0.1 second delay between messages
                time.sleep(delay)
            
            # Replay completed
            replay_duration = time.time() - replay_start_time
            
            print("\n" + "=" * 70)
            print(f"REPLAY #{replay_count} COMPLETED")
            print("=" * 70)
            print(f"Total records processed: {total_records:,}")
            print(f"Records published (new): {replay_published:,}")
            print(f"Records skipped (unchanged): {replay_skipped:,}")
            print(f"New record rate: {(replay_published/total_records)*100:.1f}%")
            print(f"Replay duration: {replay_duration:.1f}s")
            print(f"Cumulative total published: {cumulative_published:,} messages")
            print(f"Cumulative total skipped: {cumulative_skipped:,} messages")
            print(f"Waiting {restart_delay}s before next replay...")
            print("=" * 70 + "\n")
            
            # Wait 60 seconds before restarting
            time.sleep(restart_delay)
    
    except KeyboardInterrupt:
        print("\n\n" + "=" * 70)
        print("STREAM PUBLISHING STOPPED BY USER")
        print("=" * 70)
        print(f"Total replays completed: {replay_count}")
        print(f"Total messages published: {cumulative_published:,}")
        print(f"Total messages skipped: {cumulative_skipped:,}")
        print("=" * 70 + "\n")
    
    except Exception as e:
        print(f"\n\n✗ ERROR DURING PUBLISHING: {e}")
        print("=" * 70 + "\n")
        raise
    
    finally:
        print("Cleaning up MQTT connection...")
        client.loop_stop()
        client.disconnect()

In [None]:
# Start the continuous stream
publish_facility_stream(
    df=df_filtered,
    topic=MQTT_TOPIC,
    delay=0.1,          # Exactly 0.1 second between messages
    restart_delay=60    # 60 second delay before replay
)


MQTT STREAM PUBLISHING STARTED
Total records per replay: 1,120,059
Message delay: 0.1s (exactly 0.1 second)
Restart delay: 60s (when data exhausted)
Publishing strategy: Only NEW records (changed values)
MQTT topic: szha0285/ass2
MQTT broker: 172.17.34.107:1883
Event time range: 2025-10-01 00:00:00+10:00 to 2025-10-08 23:50:00+10:00

Stream publishing in progress...
(Press Ctrl+C to stop)

______________________________________________________________________
REPLAY #1 - Started at 2025-11-06 18:41:52
______________________________________________________________________

18:42:44 | Progress: 530/1,120,059 (0.0%) | Published: 500 | Skipped: 30 | Rate: 9.7 msg/s
18:43:34 | Progress: 1,610/1,120,059 (0.1%) | Published: 1,000 | Skipped: 610 | Rate: 9.8 msg/s
18:44:25 | Progress: 2,689/1,120,059 (0.2%) | Published: 1,500 | Skipped: 1,189 | Rate: 9.8 msg/s
18:45:15 | Progress: 3,814/1,120,059 (0.3%) | Published: 2,000 | Skipped: 1,814 | Rate: 9.9 msg/s
18:46:05 | Progress: 4,877/1,120,059 