Import necessary libraries

In [18]:
# Import necessary libraries
import requests
import pandas as pd
import json
from datetime import datetime, timezone
import time
import csv
import paho.mqtt.client as mqtt

# For Generate Unique Transaction Id for accesing API
import uuid

import config_secret

Import API Key

Import API Key Instructions

1. Go to the [NSW API Fuel documentation](https://api.nsw.gov.au/Product/Index/22#v-pills-doc) to get the **API Key** and **Authorization Header**.  
2. Subscribe to the Fuel API by creating an NSW API account:  
   - Create a new app with your chosen **App Name** and **App Description**.  
   - Under **Add an API Product**, select **Fuel API**.  
   - Scroll to the bottom of the page and click **Add app**.  
   - You will receive your **API Key**, **API Secret**, and **Authorization Header**.  
3. In the home directory of your assignment folder, create a file named `config_secret.py`.  
   - _Note:_ Place `config_secret.py` in the same directory as `COMP5339AS02.ipynb`.  
4. Open `config_secret.py` and add your `API_KEY` and `AuthorizationHeader`.  
   - A sample file has been provided as `config_secret_sample.py`.  

In [19]:
# grab your key from config_secret.py
API_KEY = config_secret.API_KEY
AuthorizationHeader = config_secret.AuthorizationHeader

1. Data Retrieval

In [20]:
# 1. Data Retrieval

# Fuel API Document
# https://api.nsw.gov.au/Product/Index/22#v-pills-doc

class FuelPriceCheckAPI:
    def __init__(self, API_KEY, AuthorizationHeader):
        self.url_base = "https://api.onegov.nsw.gov.au" # Base Url
        self.API_KEY = API_KEY
        self.AuthorizationHeader = AuthorizationHeader

    
    def get_datetime_now():
        return datetime.now(timezone.utc).strftime("%d/%m/%Y %I:%M:%S %p")
    

    def get_unique_transactionId():
        return str(uuid.uuid4())

    def get_accesstoken(self):
        # Config url api prameters for getting accesstoken
        url_subpart = "/oauth/client_credential/accesstoken"

        url = self.url_base + url_subpart

        query_getAccessToken = {"grant_type":"client_credentials"}

        headers_getAccessToken = {'Authorization': self.AuthorizationHeader}

        # Get the web result for accesstoken
        response = requests.get(url, headers=headers_getAccessToken, params=query_getAccessToken)

        # Get the accesstoken from the web result
        access_token = response.json()["access_token"]

        return access_token


    def getFuelPrice(self):
        '''
        Returns all current fuel prices for all service stations. There may be restrictions on how often this API request can be made. It is recommended to execute this call in a separate api client as response can be over 2 mb. This API returns data for NSW.
        '''
        # Config url api prameters for getting fuel price
        url_subpart = "/FuelPriceCheck/v1/fuel/prices"

        url = self.url_base + url_subpart

        headers = {
            'Authorization': f'Bearer {self.get_accesstoken()}',
            'Content-Type': 'application/json; charset=utf-8',
            'apikey': self.API_KEY,
            'transactionid': FuelPriceCheckAPI.get_unique_transactionId(),
            'requesttimestamp': FuelPriceCheckAPI.get_datetime_now()
        }

        time.sleep(5)

        response = requests.get(url, headers=headers)

        return response



    def getNewFuelPrice(self):
        '''
        Returns all new current prices that have been submitted since the last "/fuelpricecheck/v1/fuel/prices" or "/fuelpricecheck/v1/fuel/prices/new" request using the apikey on the current day. This API returns data for NSW.
        '''
        # Config url api prameters for getting fuel price
        url_subpart = "/FuelPriceCheck/v1/fuel/prices/new"

        url = self.url_base + url_subpart

        headers = {
            'Authorization': f'Bearer {self.get_accesstoken()}',
            'Content-Type': 'application/json; charset=utf-8',
            'apikey': self.API_KEY,
            'transactionid': FuelPriceCheckAPI.get_unique_transactionId(),
            'requesttimestamp': FuelPriceCheckAPI.get_datetime_now()
        }

        time.sleep(5)

        response = requests.get(url, headers=headers)

        return response
    
    


In [21]:
# Construct the class
fuelpriceAPI = FuelPriceCheckAPI(API_KEY, AuthorizationHeader)

In [22]:
# To use the API, simply called fuelpriceAPI.getNewFuelPrice() or fuelpriceAPI.getFuelPrice() depends on the usage
responseFromGetFuelPrice = fuelpriceAPI.getFuelPrice()

In [23]:
# See the result
# print(json.dumps(responseFromGetFuelPrice.json(), indent=4, sort_keys=True))

In [24]:
# To use the API, simply called fuelpriceAPI.getNewFuelPrice() or fuelpriceAPI.getFuelPrice() depends on the usage
responseFromGetNewFuelPrice = fuelpriceAPI.getNewFuelPrice()

In [25]:
# See the result
# print(json.dumps(responseFromGetNewFuelPrice.json(), indent=4, sort_keys=True))

2. Data Integration and Storage

In [26]:
def clean_and_display_fuel_data(input_file="integrated_fuel_data.csv", output_file="integrated_fuel_data.csv", column_width=70):
    # Load the data
    df = pd.read_csv(input_file)

    # Check for missing values
    missing_values = df.isnull().sum()
    # print("Missing Values per Column:")
    # print(missing_values)

    # Drop rows with missing critical information
    df = df.dropna(subset=["ServiceStationName", "FuelCode", "PriceUpdatedDate", "Latitude", "Longitude", "Price"])

    # Fill missing values in non-critical columns
    df["Suburb"].fillna("Unknown", inplace=True)
    df["Postcode"].fillna("Unknown", inplace=True)

    # Convert FuelCode and Brand to categorical
    df["FuelCode"] = df["FuelCode"].astype("category")
    df["Brand"] = df["Brand"].astype("category")

    # Ensure Latitude, Longitude, and Price are numeric
    df["Latitude"] = pd.to_numeric(df["Latitude"], errors='coerce')
    df["Longitude"] = pd.to_numeric(df["Longitude"], errors='coerce')
    df["Price"] = pd.to_numeric(df["Price"], errors='coerce')

    # Remove invalid coordinates or prices
    df = df[
        (df["Price"] >= 0) &
        (df["Latitude"].between(-90, 90)) &
        (df["Longitude"].between(-180, 180))
    ]

    # Drop duplicate station-fuel-location combos
    df = df.drop_duplicates(subset=["ServiceStationName", "FuelCode", "Latitude", "Longitude"], keep="first")

    # Standardize text fields
    df["Brand"] = df["Brand"].str.title()
    df["Suburb"] = df["Suburb"].str.title()
    df["FuelCode"] = df["FuelCode"].str.upper()

    # Save cleaned data
    df.to_csv(output_file, index=False)
    # print(f"Cleaned data saved as '{output_file}'.")

    # # Print cleaned data as aligned columns
    # with open(output_file, mode="r", encoding="utf-8") as file:
    #     reader = csv.DictReader(file)
    #     headers = reader.fieldnames

    #     print("".join(h.ljust(column_width) for h in headers))
    #     print("-" * column_width * len(headers))

    #     for row in reader:
    #         print("".join(str(row.get(field, "")).ljust(column_width) for field in headers))

    # print("Cleaned DataFrame size:", df.shape)

    return df  

def fetch_and_save_fuel_data(fuelpriceAPI, output_file="integrated_fuel_data.csv", column_width=70):
    # Step 1: Fetch data from the API
    response = fuelpriceAPI.getFuelPrice()
    data = response.json()

    # Step 2: Create station mapping
    station_map = {
        station["code"]: {
            "ServiceStationName": station.get("name"),
            "Address": station.get("address"),
            "Brand": station.get("brand"),
            "Latitude": station.get("location", {}).get("latitude"),
            "Longitude": station.get("location", {}).get("longitude")
        }
        for station in data.get("stations", [])
    }

    # Step 3: Combine station info with prices
    combined_data = []
    for price_entry in data.get("prices", []):
        station_code = price_entry.get("stationcode")
        station_info = station_map.get(station_code)

        if station_info:
            full_address = station_info["Address"]
            try:
                parts = full_address.split(", ")
                suburb_postcode = parts[-1].rsplit(" ", 2)
                suburb = suburb_postcode[0]
                postcode = suburb_postcode[-1]
            except Exception:
                suburb, postcode = None, None

            combined_data.append({
                "ServiceStationName": station_info["ServiceStationName"],
                "Address": full_address,
                "Suburb": suburb,
                "Postcode": postcode,
                "Brand": station_info["Brand"],
                "FuelCode": price_entry.get("fueltype"),
                "Price": price_entry.get("price"),
                "PriceUpdatedDate": price_entry.get("lastupdated"),
                "Latitude": station_info["Latitude"],
                "Longitude": station_info["Longitude"]
            })

    # Step 4: Save to CSV
    df = pd.DataFrame(combined_data)
    df.to_csv(output_file, index=False)
    # print(f"Saved: {output_file}")

    # Step 5: Clean
    cleaned_df = clean_and_display_fuel_data()


def update_fuel_data(fuelpriceAPI, existing_file="integrated_fuel_data.csv", output_file="integrated_fuel_data.csv"):
    # Step 1: Load existing data
    existing_df = pd.read_csv(existing_file)

    # Step 2: Fetch new data
    response = fuelpriceAPI.getNewFuelPrice()
    new_data_json = response.json()

    # Step 3: Extract station mapping
    station_map = {
        station["code"]: {
            "ServiceStationName": station.get("name"),
            "Latitude": station.get("location", {}).get("latitude"),
            "Longitude": station.get("location", {}).get("longitude")
        }
        for station in new_data_json.get("stations", [])
    }

    # Step 4: Extract new fuel prices (subset of full dataset)
    new_price_rows = []
    for price in new_data_json.get("prices", []):
        station_code = price.get("stationcode")
        station_info = station_map.get(station_code)
        if station_info:
            new_price_rows.append({
                "ServiceStationName": station_info["ServiceStationName"],
                "FuelCode": price.get("fueltype"),
                "Latitude": station_info["Latitude"],
                "Longitude": station_info["Longitude"],
                "Price": price.get("price"),
                "PriceUpdatedDate": price.get("lastupdated")
            })

    new_df = pd.DataFrame(new_price_rows)

    if new_df.empty:
        print("No updates found in new data.")
        return existing_df

    # Step 5: Merge to update only matching entries
    updated_df = pd.merge(
        existing_df,
        new_df,
        on=["FuelCode", "Latitude", "Longitude"],
        how="left",
        suffixes=('', '_new')
    )

    # Step 6: Replace old values only if new ones are provided
    updated_df["Price"] = updated_df["Price_new"].combine_first(updated_df["Price"])
    updated_df["PriceUpdatedDate"] = updated_df["PriceUpdatedDate_new"].combine_first(updated_df["PriceUpdatedDate"])
    updated_df.drop(columns=["Price_new", "PriceUpdatedDate_new"], inplace=True)

    # Step 7: Save updated file
    updated_df.to_csv(output_file, index=False)
    # print(f"Updated {new_df.shape[0]} fuel price entries.")
    # print(f"Saved updated data to '{output_file}'.")

    return updated_df



In [27]:
fetch_and_save_fuel_data(fuelpriceAPI)

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df["Suburb"].fillna("Unknown", inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df["Postcode"].fillna("Unknown", inplace=True)


In [28]:
updated_df = update_fuel_data(fuelpriceAPI)

# # Print cleaned data as aligned columns
# with open('integrated_fuel_data.csv', mode="r", encoding="utf-8") as file:
#     reader = csv.DictReader(file)
#     headers = reader.fieldnames

#     print("".join(h.ljust(70) for h in headers))
#     print("-" * 70 * len(headers))

#     for row in reader:
#         print("".join(str(row.get(field, "")).ljust(70) for field in headers))

No updates found in new data.


3. Data Publishing via MQTT

In [29]:
# 3. Data Publishing via MQTT


In [32]:
# Define callbacks
def on_connect(client, userdata, connect_flags, reason_code, properties):
    print("Connected with result code", str(reason_code))
    # Subscribe as soon as we connect
    client.subscribe("COMP5339/Assignment02/Group07/FuelPrice")

def on_message(client, userdata, msg):
    print(json.loads(msg.payload))

def publish_data():
    # 1. Retrieve cleaned data
    clean_data = pd.read_csv("integrated_fuel_data.csv")
    
    # 2. Create client and attach callbacks
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
    client.on_connect = on_connect
    client.on_message = on_message
    
    # 3. Connect to broker
    client.connect("broker.hivemq.com", 1883, keepalive=60)
    
    # 4. Start the network loop in a background thread
    client.loop_start()
    
    # 5. Publish periodically
    for row in clean_data.itertuples():
    
        data = {
            "Index": row.Index, 
            "ServiceStationName": row.ServiceStationName,
            "Address": row.Address, 
            "Suburb": row.Suburb, 
            "Postcode": row.Postcode, 
            "Brand": row.Brand, 
            "FuelCode": row.FuelCode, 
            "Price": row.Price, 
            "PriceUpdatedDate": row.PriceUpdatedDate, 
            "Latitude": row.Latitude, 
            "Longitude": row.Longitude
        }
    
        client.publish("COMP5339/Assignment02/Group07/FuelPrice", json.dumps(data), qos=2)
    
        time.sleep(0.1)
    
    # 6. Clean up
    client.loop_stop()
    client.disconnect()

In [35]:
publish_data()

Connected with result code Success
{'Index': 4, 'ServiceStationName': 'Coles Express Coffs Harbour', 'Address': '208-212 Pacific Hwy North, Coffs Harbour NSW 2450', 'Suburb': 'Coffs Harbour', 'Postcode': '2450', 'Brand': 'Coles Express', 'FuelCode': 'U91', 'Price': 180.9, 'PriceUpdatedDate': '12/04/2025 06:15:45', 'Latitude': -30.286083, 'Longitude': 153.124146}
{'Index': 5, 'ServiceStationName': 'Shell Reddy  Express Glendale', 'Address': '593 Main Rd, Glendale NSW 2285', 'Suburb': 'Glendale', 'Postcode': '2285', 'Brand': 'Reddy Express', 'FuelCode': 'DL', 'Price': 186.9, 'PriceUpdatedDate': '16/05/2025 03:15:47', 'Latitude': -32.927065, 'Longitude': 151.637938}
{'Index': 6, 'ServiceStationName': 'Shell Reddy  Express Glendale', 'Address': '593 Main Rd, Glendale NSW 2285', 'Suburb': 'Glendale', 'Postcode': '2285', 'Brand': 'Reddy Express', 'FuelCode': 'E10', 'Price': 171.9, 'PriceUpdatedDate': '18/05/2025 23:05:39', 'Latitude': -32.927065, 'Longitude': 151.637938}
{'Index': 7, 'Servic

4. Data Subscribing and Visualisation

In [None]:
# 4. Data Subscribing and Visualisation

In [15]:
print(1)

1


5. Continuous Execution

In [None]:
# 5. Continuous Execution