In [1]:
import requests
import pandas as pd
from math import ceil
from time import sleep
import time
import json
from IPython.display import display

# 1. Data Retrieval and Integration

In [2]:
BASE = "https://api.openelectricity.org.au/v4"
TOKEN = "oe_3ZPA7phVarPYZc7Qks7uijfE" 
NETWORK = "NEM"     

HEADERS = {
    "Authorization": f"Bearer {TOKEN}",
    "Accept": "application/json",
}


DATE_START = "2025-10-01T00:00:00"  
DATE_END   = "2025-10-08T00:00:00" 


In [3]:
#get all facilities and related units in NEM
def get_nem_facilities():
    url = f"{BASE}/facilities/"
    # 按文档：network_id 可传数组；也可用 status_id / fueltech_id 做更细过滤
    params = {
        "network_id": ["NEM"],          # 只要 NEM
        "status_id": ["operating"],     # 只要运营中
       
    }
    r = requests.get(url, headers=HEADERS, params=params, timeout=60)
    r.raise_for_status()
    js = r.json()
 
    fac_df = pd.json_normalize(js["data"])  # 设施层
    # 展开 units 列为行（record_path=units），带上 facility 的 code 作为 meta
    units_df = pd.json_normalize(
    js["data"],
    record_path="units",
    meta=["code"],
    meta_prefix="facility_",    # ← 防止重复名
    errors="ignore"
    ).rename(columns={"code": "unit_code", "facility_code": "facility_code"})

    # 某些版本列名就是 'code'（unit）与 'facility_code'（meta），做个兜底：
    if "unit_code" not in units_df.columns:
        # 常见字段名：'code'（unit 的 code）
        if "code" in units_df.columns:
            units_df = units_df.rename(columns={"code": "unit_code"})

    return fac_df, units_df

fac_df, units_df = get_nem_facilities()

units_df.head()

Unnamed: 0,unit_code,fueltech_id,status_id,capacity_registered,capacity_maximum,data_first_seen,data_last_seen,dispatch_type,created_at,updated_at,capacity_storage,emissions_factor_co2,facility_code
0,ADPPV1,solar_utility,operating,24.75,19.0,2021-05-18T13:10:00+10:00,2025-11-07T15:00:00+10:00,GENERATOR,2023-10-18T04:34:30Z,2024-12-16T23:52:12Z,,,ADP
1,ADPPV2,solar_utility,operating,0.2,0.2,,,GENERATOR,2023-10-18T04:34:30Z,2024-12-16T23:50:10Z,,,ADP
2,ADPPV3,solar_utility,operating,0.02,0.02,,,GENERATOR,2023-10-18T04:34:30Z,2024-12-16T23:51:11Z,,,ADP
3,ADPBA1G,battery_discharging,operating,7.76,6.15,2021-05-18T10:55:00+10:00,2025-11-07T15:00:00+10:00,GENERATOR,2023-10-18T04:34:30Z,2025-06-23T05:34:25Z,12.6,,ADP
4,ADPBA1L,battery_charging,operating,7.76,6.15,2021-05-18T09:55:00+10:00,2025-11-07T14:55:00+10:00,LOAD,2023-10-18T04:34:30Z,2025-06-23T05:34:16Z,12.6,,ADP


In [4]:
fac_df.head()

Unnamed: 0,code,name,network_id,network_region,description,units,updated_at,created_at,location.lat,location.lng
0,ADP,Adelaide Desalination,NEM,SA1,"<p>The Adelaide Desalination plant (ADP), form...","[{'code': 'ADPPV1', 'fueltech_id': 'solar_util...",2025-08-05T06:08:12Z,2023-10-18T04:34:30Z,-35.096948,138.484061
1,ALDGASF,Aldoga,NEM,QLD1,<p>The Aldoga Solar Farm will be approximately...,"[{'code': 'ALDGASF1', 'fueltech_id': 'solar_ut...",2025-03-25T00:52:44Z,2025-01-31T04:19:33Z,-23.839544,151.0849
2,ANGASTON,Angaston,NEM,SA1,<p>Angaston Power Station is a diesel-powered ...,"[{'code': 'ANGAST1', 'fueltech_id': 'distillat...",2025-09-07T01:53:13Z,2023-10-18T04:34:32Z,-34.503948,139.024296
3,APPIN,Appin,NEM,NSW1,"<p>In a world first, EDL developed the largest...","[{'code': 'APPIN', 'fueltech_id': 'gas_wcmg', ...",2025-09-07T01:53:15Z,2023-10-18T04:34:32Z,-34.210868,150.792711
4,ARWF,Ararat,NEM,VIC1,<p>Ararat Wind Farm is wind farm in western Vi...,"[{'code': 'ARWF1', 'fueltech_id': 'wind', 'sta...",2025-07-08T03:42:06Z,2023-10-18T04:34:32Z,-37.263393,143.082116


In [5]:
#get specific facilities time-series data
def fetch_facility_timeseries(facility_codes, date_start=DATE_START, date_end=DATE_END):
    """对一批 facility_codes 请求两种指标（power, emissions），5m 粒度"""
    url = f"{BASE}/data/facilities/{NETWORK}"
    params = {
        "metrics": ["power", "emissions"],
        "interval": "5m",
        "facility_code": facility_codes,  # 批量
        "date_start": date_start,
        "date_end": date_end,
    }
    r = requests.get(url, headers=HEADERS, params=params, timeout=120)
    r.raise_for_status()
    return r.json()

def timeseries_to_df(js):
    """把返回扁平化为：
       timestamp | unit_code | metric | value | unit
    """
    rows = []
    data_block = js.get("data", [])
    if not isinstance(data_block, list):
        return pd.DataFrame(rows)  # 可能是 '-'，直接空表

    for blk in data_block:
        if not isinstance(blk, dict):
            continue
        metric = blk.get("metric")  # 'power' / 'emissions'
        unit   = blk.get("unit")    # 'MW' / 'tCO2e'
        results = blk.get("results", [])
        for res in results:
            if not isinstance(res, dict):
                continue
            # unit_code 优先从 columns 取；没有就从 name 拆
            unit_code = None
            cols = res.get("columns")
            if isinstance(cols, dict):
                unit_code = cols.get("unit_code")
            name = res.get("name")
            if not unit_code and isinstance(name, str) and "_" in name:
                unit_code = name.split("_", 1)[1]  # e.g. power_ADPBA1 → ADPBA1

            for item in res.get("data", []):  # item = [timestamp, value]
                if isinstance(item, (list, tuple)) and len(item) >= 2:
                    ts, val = item[0], item[1]
                    rows.append({
                        "timestamp": ts,
                        "unit_code": unit_code,
                        "metric": metric,
                        "value": val,
                        "unit": unit
                    })
    return pd.DataFrame(rows)

# 先拿到 facility_code 列表（可按需过滤 fueltech/network_region 来减少范围）
facility_codes = fac_df["code"].dropna().unique().tolist()

# 日配额考虑：把 batch 调大一点（但单次响应也会更大）
BATCH   = 30
SLEEP_S = 0.25

all_parts = []
req_count = 0

for i in range(0, len(facility_codes), BATCH):
    batch = facility_codes[i:i+BATCH]
    js = fetch_facility_timeseries(batch)
    req_count += 1
    df_part = timeseries_to_df(js)
    all_parts.append(df_part)
    time.sleep(SLEEP_S)

df_unit = pd.concat(all_parts, ignore_index=True) if all_parts else pd.DataFrame()
print("requests used:", req_count, "rows:", len(df_unit))
display(df_unit.head())

requests used: 14 rows: 1958730


Unnamed: 0,timestamp,unit_code,metric,value,unit
0,2025-10-01T00:00:00+10:00,ADPBA1,power,-0.004,MW
1,2025-10-01T00:05:00+10:00,ADPBA1,power,-0.046,MW
2,2025-10-01T00:10:00+10:00,ADPBA1,power,0.0,MW
3,2025-10-01T00:15:00+10:00,ADPBA1,power,0.003,MW
4,2025-10-01T00:20:00+10:00,ADPBA1,power,-0.018,MW


In [6]:
# 建立 unit_code → facility_code 的映射
unit_to_fac = units_df[["unit_code", "facility_code"]].dropna().drop_duplicates()
df_unit = df_unit.merge(unit_to_fac, on="unit_code", how="left")

# per-facility 宽表：同一时间点、同一 facility 的 power/emissions 各一列（把多单元求和）
df_fac_wide = (
    df_unit
    .pivot_table(index=["timestamp", "facility_code"], columns="metric", values="value", aggfunc="sum")
    .rename_axis(columns=None)
    .reset_index()
    .sort_values(["timestamp", "facility_code"])
)

# 将 fac_df 中需要的列挑出来
fac_info = fac_df[["code", "name", "location.lat", "location.lng","network_region"]].rename(
    columns={"code": "facility_code", "name": "facility_name"}
)

# 左连接合并（保留 df_fac_wide 中的所有行）
df_fac_wide = df_fac_wide.merge(fac_info, on="facility_code", how="left")

# 查看结果
display(df_fac_wide.head())


Unnamed: 0,timestamp,facility_code,emissions,power,facility_name,location.lat,location.lng,network_region
0,2025-10-01T00:00:00+10:00,0MREH,0.0,0.0,Melbourne A1,-37.661274,144.726302,VIC1
1,2025-10-01T00:00:00+10:00,0MREHA2,0.0,0.0,Melbourne A2,-37.663934,144.726927,VIC1
2,2025-10-01T00:00:00+10:00,0TARONGBESS,0.0,0.0,Tarong,-26.780051,151.912068,QLD1
3,2025-10-01T00:00:00+10:00,0WAMBOWF,0.0,65.23,Wambo,-26.603045,151.246876,QLD1
4,2025-10-01T00:00:00+10:00,ADP,0.0,0.0,Adelaide Desalination,-35.096948,138.484061,SA1


In [7]:
def fetch_market_network(network_region=None,date_start=DATE_START, date_end=DATE_END):
    url = f"{BASE}/market/network/{NETWORK}"
    params = {
        "metrics": ["price", "demand"],
        "interval": "5m",
        "date_start": date_start,
        "date_end": date_end,
        "primary_grouping": "network_region",  # "network" 或 "network_region"
    }
    if network_region:
        params["network_region"] = network_region

    r = requests.get(url, headers=HEADERS, params=params, timeout=60)
    r.raise_for_status()
    return r.json()

# -------- 2) 扁平化为 DataFrame（长表：timestamp / metric / value / unit / network_region） --------
def market_to_df(js):
    """
    将 /v4/market/network 返回的数据扁平化为长表：
    列：timestamp | network_region | metric | value | unit
    兼容 columns.region / columns.network_region；必要时从 name 里解析区域。
    """
    rows = []
    data_block = js.get("data", [])
    if not isinstance(data_block, list):
        return pd.DataFrame(rows)

    for blk in data_block:
        metric = blk.get("metric")         # e.g. "price" / "demand"
        unit   = blk.get("unit")           # e.g. "$/MWh" / "MW"

        for res in blk.get("results", []):
            cols = res.get("columns") or {}
            name = res.get("name") or ""

            # 优先按你截图：columns 里是 {"region": "NSW1"}
            region = cols.get("region") or cols.get("network_region")

            # 兜底：从 name 里解析（如 "price_NSW1"）
            if region is None and isinstance(name, str):
                m = re.search(r'_(NSW1|VIC1|QLD1|SA1|TAS1)\b', name)
                region = m.group(1) if m else None

            # 展开该区域的时序点
            for item in res.get("data", []):
                if isinstance(item, (list, tuple)) and len(item) >= 2:
                    ts, val = item[0], item[1]
                    rows.append({
                        "timestamp": ts,
                        "network_region": region,
                        "metric": metric,
                        "value": val,
                        "unit": unit
                    })

    df = pd.DataFrame(rows)
    return df


js = fetch_market_network()

df_market = market_to_df(js)

# # 如需宽表（每行一个时间+区域，price/demand 两列）：
df_market_wide = (
    df_market
    .pivot_table(
        index=["timestamp", "network_region"],  # 每个区域、每个时间点一行
        columns="metric",                      # 每种指标（price, demand）变成一列
        values="value",                        # 数值列
        aggfunc="mean"                         # 遇到重复取最后一个（或可换成 'mean'）
    )
    .rename_axis(columns=None)                 # 去掉多余 axis 名称
    .reset_index()                             # 把索引恢复成普通列
    .sort_values(["timestamp", "network_region"])
)
display(df_market_wide.head(10))

Unnamed: 0,timestamp,network_region,demand,price
0,2025-10-01T00:00:00+10:00,NSW1,7105.57,56.98
1,2025-10-01T00:00:00+10:00,QLD1,5989.24,54.82
2,2025-10-01T00:00:00+10:00,SA1,1564.92,8.11
3,2025-10-01T00:00:00+10:00,TAS1,898.71,0.12
4,2025-10-01T00:00:00+10:00,VIC1,4893.49,8.95
5,2025-10-01T00:05:00+10:00,NSW1,7170.68,80.01
6,2025-10-01T00:05:00+10:00,QLD1,5920.4,67.3
7,2025-10-01T00:05:00+10:00,SA1,1565.38,0.01
8,2025-10-01T00:05:00+10:00,TAS1,897.18,0.2
9,2025-10-01T00:05:00+10:00,VIC1,4889.73,0.01


In [8]:
df_fac_wide = (
    df_fac_wide
    .merge(
        df_market_wide[["timestamp", "network_region", "demand", "price"]],
        on=["timestamp", "network_region"],
        how="left"
    )
)
df_fac_wide = df_fac_wide.rename(columns={"network_region": "market(network_region)"})
df_fac_wide.head()

Unnamed: 0,timestamp,facility_code,emissions,power,facility_name,location.lat,location.lng,market(network_region),demand,price
0,2025-10-01T00:00:00+10:00,0MREH,0.0,0.0,Melbourne A1,-37.661274,144.726302,VIC1,4893.49,8.95
1,2025-10-01T00:00:00+10:00,0MREHA2,0.0,0.0,Melbourne A2,-37.663934,144.726927,VIC1,4893.49,8.95
2,2025-10-01T00:00:00+10:00,0TARONGBESS,0.0,0.0,Tarong,-26.780051,151.912068,QLD1,5989.24,54.82
3,2025-10-01T00:00:00+10:00,0WAMBOWF,0.0,65.23,Wambo,-26.603045,151.246876,QLD1,5989.24,54.82
4,2025-10-01T00:00:00+10:00,ADP,0.0,0.0,Adelaide Desalination,-35.096948,138.484061,SA1,1564.92,8.11


# 2. Data Preprocessing

In [9]:
df_fac_wide.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 668353 entries, 0 to 668352
Data columns (total 10 columns):
 #   Column                  Non-Null Count   Dtype  
---  ------                  --------------   -----  
 0   timestamp               668353 non-null  object 
 1   facility_code           668353 non-null  object 
 2   emissions               668353 non-null  float64
 3   power                   668353 non-null  float64
 4   facility_name           668353 non-null  object 
 5   location.lat            668353 non-null  float64
 6   location.lng            668353 non-null  float64
 7   market(network_region)  668353 non-null  object 
 8   demand                  667033 non-null  float64
 9   price                   654823 non-null  float64
dtypes: float64(6), object(4)
memory usage: 51.0+ MB


In [10]:
#把时间转换成澳东时区
df_fac_wide["timestamp"] = pd.to_datetime(df_fac_wide["timestamp"], utc=True)
df_fac_wide["timestamp"] = df_fac_wide["timestamp"].dt.tz_convert("Australia/Sydney")

df_fac_wide["facility_code"]  = df_fac_wide["facility_code"].astype("string")
df_fac_wide["facility_name"]  = df_fac_wide["facility_name"].astype("string")
df_fac_wide["market(network_region)"]  = df_fac_wide["market(network_region)"].astype("string")

In [11]:
neg_demand = df_fac_wide[df_fac_wide["demand"] < 0]
print(f"Negative demand rows: {len(neg_demand)}")
display(neg_demand.head())

Negative demand rows: 290


Unnamed: 0,timestamp,facility_code,emissions,power,facility_name,location.lat,location.lng,market(network_region),demand,price
333607,2025-10-04 12:15:00+10:00,ADP,0.0,0.088,Adelaide Desalination,-35.096948,138.484061,SA1,-12.17,-31.19
333608,2025-10-04 12:15:00+10:00,AGLHAL,0.0,0.0,Hallett,-33.34931,138.752633,SA1,-12.17,-31.19
333611,2025-10-04 12:15:00+10:00,ANGASTON,0.0,0.0,Angaston,-34.503948,139.024296,SA1,-12.17,-31.19
333620,2025-10-04 12:15:00+10:00,BARKIPS,0.0,0.0,Barker Inlet,-34.804,138.524,SA1,-12.17,-31.19
333632,2025-10-04 12:15:00+10:00,BLUFF,0.0,0.0,The Bluff,-33.367741,138.79379,SA1,-12.17,-31.19


In [12]:
neg_demand = df_fac_wide[df_fac_wide["demand"] < 0]
print(f"Negative demand rows: {len(neg_demand)}")

ratio = (len(neg_demand) / len(df_fac_wide) * 100)
print(f"Negative demand ratio (%): {round(ratio, 2)}")

display(neg_demand.head())

Negative demand rows: 290
Negative demand ratio (%): 0.04


Unnamed: 0,timestamp,facility_code,emissions,power,facility_name,location.lat,location.lng,market(network_region),demand,price
333607,2025-10-04 12:15:00+10:00,ADP,0.0,0.088,Adelaide Desalination,-35.096948,138.484061,SA1,-12.17,-31.19
333608,2025-10-04 12:15:00+10:00,AGLHAL,0.0,0.0,Hallett,-33.34931,138.752633,SA1,-12.17,-31.19
333611,2025-10-04 12:15:00+10:00,ANGASTON,0.0,0.0,Angaston,-34.503948,139.024296,SA1,-12.17,-31.19
333620,2025-10-04 12:15:00+10:00,BARKIPS,0.0,0.0,Barker Inlet,-34.804,138.524,SA1,-12.17,-31.19
333632,2025-10-04 12:15:00+10:00,BLUFF,0.0,0.0,The Bluff,-33.367741,138.79379,SA1,-12.17,-31.19


In [13]:
neg_price = df_fac_wide[df_fac_wide["price"] < 0]
print(f"Negative price rows: {len(neg_price)}")
display(neg_price.head())

Negative price rows: 279826


Unnamed: 0,timestamp,facility_code,emissions,power,facility_name,location.lat,location.lng,market(network_region),demand,price
7260,2025-10-01 01:50:00+10:00,0MREH,0.0,1.4704,Melbourne A1,-37.661274,144.726302,VIC1,4300.21,-0.06
7261,2025-10-01 01:50:00+10:00,0MREHA2,0.0,0.0,Melbourne A2,-37.663934,144.726927,VIC1,4300.21,-0.06
7264,2025-10-01 01:50:00+10:00,ADP,0.0,0.0,Adelaide Desalination,-35.096948,138.484061,SA1,1513.45,-0.05
7265,2025-10-01 01:50:00+10:00,AGLHAL,0.0,0.0,Hallett,-33.34931,138.752633,SA1,1513.45,-0.05
7266,2025-10-01 01:50:00+10:00,AGLSOM,0.0,0.0,Somerton,-37.630949,144.953098,VIC1,4300.21,-0.06


In [14]:
df_fac_wide.to_csv("nem_per_facility_power_emissions_5m_2025-10-01_to_08.csv", index=False)

# 3.Data Publishing via MQTT

In [15]:
# MQTT broker settings
import socket

# Unified topic used across publish/subscribe
BROKER_HOST = "test.mosquitto.org" # mqtt broker
BROKER_PORT = 1883                 # mqtt port
TOPIC       = "nem/yjia0057/power_emissions"  # unified topic
LOCAL_TZ    = "Australia/Sydney"               

try:
    s = socket.socket(); s.settimeout(3); s.connect((BROKER_HOST, BROKER_PORT))
    print(f"MQTT broker reachable: {BROKER_HOST}:{BROKER_PORT}")
finally:
    try: s.close()
    except: pass




MQTT broker reachable: test.mosquitto.org:1883


In [16]:
import json, uuid
import paho.mqtt.client as mqtt

# paho v2 first, fallback to v1
def _make_client(cid: str):
    try:
        return mqtt.Client(client_id=cid, protocol=mqtt.MQTTv311, transport="tcp",
                           callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
    except Exception:
        return mqtt.Client(client_id=cid, protocol=mqtt.MQTTv311, transport="tcp")

def start_subscriber(topic=TOPIC, host=BROKER_HOST, port=BROKER_PORT):
     # simple sub, print first 10 messages
    c = _make_client(f"nem-sub-{uuid.uuid4().hex[:6]}")
    cnt = 0
    def on_connect(client, userdata, flags, rc, properties=None):
        print("connected, rc= Success" if rc == 0 else f"rc={rc}")
        client.subscribe(topic, qos=1)
    def on_message(client, userdata, msg):
        nonlocal cnt; cnt += 1
        if cnt <= 10:
            try: print("[sub] sample:", json.loads(msg.payload.decode("utf-8")))
            except: print("[sub] raw:", msg.payload[:160])
    c.on_connect = on_connect; c.on_message = on_message
    c.reconnect_delay_set(1, 10)
    c.connect(host, port, keepalive=60); c.loop_start()
    return c

# avoid duplicate subscriber
try:
    sub.loop_stop(); sub.disconnect()
except:
    pass
sub = start_subscriber()

In [17]:
import time, json, uuid
import pandas as pd

_REGION_CANDS = ["network_region", "market(network_region)", "region"]

 # pick first existing region col
def _detect_region_col(df: pd.DataFrame) -> str:
    for c in _REGION_CANDS:
        if c in df.columns: return c
    raise KeyError("region column not found")

def _best_time_col(df: pd.DataFrame) -> str:
    return "event_time_utc" if "event_time_utc" in df.columns else "timestamp"

 # convert any ts to Sydney local time (+10/+11)
def _to_local_sydney(ts_val):
    ts = pd.to_datetime(ts_val, errors="coerce")
    if pd.isna(ts): 
        return pd.NaT
    if getattr(ts, "tz", None) is None:       
        try:
            return ts.tz_localize(LOCAL_TZ, ambiguous="infer", nonexistent="shift_forward")
        except Exception:
            return ts.tz_localize(LOCAL_TZ)
    else:
        return ts.tz_convert(LOCAL_TZ)
        
# mqtt publisher client
def _connect_client(host=BROKER_HOST, port=BROKER_PORT, user="", password=""):
    try:
        cli = mqtt.Client(client_id=f"nem-pub-{uuid.uuid4().hex[:8]}",
                          protocol=mqtt.MQTTv311, transport="tcp",
                          callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
    except Exception:
        cli = mqtt.Client(client_id=f"nem-pub-{uuid.uuid4().hex[:8]}",
                          protocol=mqtt.MQTTv311, transport="tcp")
    if user: cli.username_pw_set(user, password or "")
    cli.reconnect_delay_set(1, 10)
    try: cli.will_set("nem/system/peer", payload="offline", qos=1, retain=False)
    except: pass
    cli.connect(host, port, keepalive=60)
    return cli

 # light rename
def prepare_for_publish(df: pd.DataFrame) -> pd.DataFrame:
    dfx = df.copy()
    dfx.rename(columns={
        "location.lat": "lat", "location.lng": "lon",
        "latitude": "lat", "longitude": "lon",
        "market(network_region)": "network_region",
    }, inplace=True)
    try:
        if 'df_facilities' in globals():
            meta = df_facilities.copy()
            meta.rename(columns={
                "location.lat": "lat", "location.lng": "lon",
                "latitude": "lat", "longitude": "lon",
                "market(network_region)": "network_region",
            }, inplace=True)
            keep = [c for c in ["facility_code","lat","lon","network_region"] if c in meta.columns]
            if "facility_code" in keep:
                meta = meta[keep].drop_duplicates("facility_code")
                dfx = dfx.merge(meta, on="facility_code", how="left", suffixes=("","_meta"))
                for c in ["lat","lon","network_region"]:
                    if c+"_meta" in dfx.columns:
                        if c in dfx.columns: dfx[c] = dfx[c].combine_first(dfx[c+"_meta"])
                        else: dfx[c] = dfx[c+"_meta"]
                drop_cols = [c for c in dfx.columns if c.endswith("_meta")]
                if drop_cols: dfx.drop(columns=drop_cols, inplace=True)
    except Exception:
        pass
    return dfx

def _row_to_payload(row: pd.Series, region_col: str, seq: int) -> dict:
    tsv   = row.get("event_time_utc", row.get("timestamp"))
    t_loc = _to_local_sydney(tsv)
    ts_iso = t_loc.isoformat() if pd.notna(t_loc) else None

    lat = row.get("lat", row.get("location.lat", row.get("latitude", None)))
    lon = row.get("lon", row.get("lng", row.get("location.lng", row.get("longitude", None))))

    return {
        "timestamp": ts_iso,                 
        "facility_code": row.get("facility_code"),
        "facility_name": row.get("facility_name"),
        "region": row.get(region_col),       
        "lat": lat, "lon": lon,
        "power_mw": row.get("power_mw", row.get("power")),
        "co2_t": row.get("co2_t", row.get("emissions")),
        "price": row.get("price"), "demand": row.get("demand"),
        "seq": seq  # publish seq
    }



In [18]:
def publish_from_df_progress(
    df: pd.DataFrame,
    host=BROKER_HOST, port=BROKER_PORT,
    topic=TOPIC, delay=0.1, also_split=True,
    user="", password="", log_every=100,
    skip_no_coords=False
) -> int:
    
    # sort by Sydney local time then facility_code
    dfx  = prepare_for_publish(df)

    tcol = _best_time_col(dfx)
    dfx["__event_local__"] = dfx[tcol].apply(_to_local_sydney)
    dfx = dfx.sort_values(["__event_local__", "facility_code"], kind="mergesort") \
             .drop(columns="__event_local__") \
             .reset_index(drop=True)

    region_col = _detect_region_col(dfx)
    
    # connect and start loop
    cli = _connect_client(host=host, port=port, user=user, password=password)
    cli.loop_start()

    n = len(dfx); t0 = time.time()
    print(f"[publisher] start: total rows = {n}, topic = {topic}, delay = {delay}s")

    for i, row in enumerate(dfx.itertuples(index=False), start=1):
        payload = _row_to_payload(pd.Series(row._asdict()), region_col, i)
        if skip_no_coords and (payload.get("lat") is None or payload.get("lon") is None):
            continue
        j = json.dumps(payload, ensure_ascii=False)
        
        # main topic
        cli.publish(topic, j, qos=1, retain=False)
        time.sleep(float(delay))  # ≥0.1s
        
        # split by region/facility
        if also_split:
            reg = str(payload.get("region", "NA")).replace("/", "-")
            fac = str(payload.get("facility_code", "NA")).replace("/", "-")
            split_topic = f"nem/reg/{reg}/{fac}/power_emissions"
            cli.publish(split_topic, j, qos=1, retain=False)
            time.sleep(float(delay))  # 分 topic 同样等待

        if log_every and i % log_every == 0:
            print(f"[publisher] {i}/{n} ({i/n:5.2%})")

    cli.loop_stop(); cli.disconnect()
    print("[publisher] done.")
    return n

In [30]:
# take 300 rows; final local-time sort is inside publisher
tcol = "event_time_utc" if "event_time_utc" in df_fac_wide.columns else "timestamp"
sample = df_fac_wide.sort_values([tcol, "facility_code"]).head(300).copy()

publish_from_df_progress(
    sample,
    host=BROKER_HOST, port=BROKER_PORT, topic=TOPIC,
    delay=0.1, also_split=True,
    log_every=100,
    skip_no_coords=False
)


[publisher] start: total rows = 300, topic = nem/yjia0057/power_emissions, delay = 0.1s
[publisher] 100/300 (33.33%)
[publisher] 200/300 (66.67%)
[publisher] 300/300 (100.00%)
[publisher] done.


300

# 5.Continuous Execution 

In [31]:
# ============================================================
# Task 5 – Continuous Execution Runner
# ============================================================

import time

def run_continuous_publish(
    df,
    *,
    rounds: int | None = None,   # 要跑多少轮；None = 无限轮
    round_sleep: int = 60,       # 每轮结束之后再等 60 秒
    host: str = "test.mosquitto.org",
    port: int = 1883,
    topic: str = "nem/yjia0057/power_emissions",
    delay: float = 0.1,          # 单条 0.1s
    also_split: bool = True,
    log_every: int = 50000,
    skip_no_coords: bool = False
):
    """
    Task 5：
      - 使用同一份 df 周期性重播，模拟无界数据流
      - 每条消息之间 delay 秒
      - 每轮播完后等待 round_sleep 秒，再从头开始下一轮
    """
    round_idx = 0
    try:
        while True:
            round_idx += 1
            print(f"\n[runner] round {round_idx} start")

            # ① 播放一整轮 df
            sent = publish_from_df_progress(
                df,
                host=host,
                port=port,
                topic=topic,
                delay=delay,
                also_split=also_split,
                log_every=log_every,
                skip_no_coords=skip_no_coords,
            )
            print(f"[runner] round {round_idx} done: published {sent} rows")

            # 判断轮数是否已完成
            if (rounds is not None) and (round_idx >= rounds):
                print("[runner] finished all rounds.")
                break

            # 轮与轮之间额外等待 60 秒
            print(f"[runner] sleeping {round_sleep}s before next round ...")
            time.sleep(round_sleep)

    except KeyboardInterrupt:
        print("\n[runner] stopped by user.")


In [32]:
run_continuous_publish(
    df=sample,
    rounds=None,          # None=无限轮
    round_sleep=60,       # 60 秒
    host=BROKER_HOST,
    port=BROKER_PORT,
    delay=0.1,            # 0.1 秒
    log_every=200,
    skip_no_coords=False
)



[runner] round 1 start
[publisher] start: total rows = 300, topic = nem/yjia0057/power_emissions, delay = 0.1s

[runner] stopped by user.
