In [2]:
"""
The Autonomous Cooperative Consensus Orbit Determination (ACCORD) framework.
Author: Beth Probert
Email: beth.probert@strath.ac.uk

Copyright (C) 2025 Applied Space Technology Laboratory

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""

import json
import hashlib # TODO - might use a different algorithm? we will see if needed. Start small ffs dont get overwhelmed by the whole ledger
from typing import Optional
import numpy as np
import astropy.constants as ac
from sgp4.io import twoline2rv
from sgp4.earth_gravity import wgs72
from skyfield.api import EarthSatellite, load

# Global Variables for consensus
# TODO - May tune these to optimise performance
CONFIRMATION_THRESHOLD = 1
REJECTION_THRESHOLD = -1
CONFIRMATION_STEP = 0.1

# Global astrophysical values
G = ac.G # Gravitational Constant in m3 / (kg s2)
M = ac.M_earth # Mass of the Earth in kg
R = float(ac.R_earth.to("km").value) # Radius of the Earth in km

In [3]:
def load_json_data(file_name: str) -> list:
    """
    Turns json data in a file into a Python dict.
    JSON data must be in the Celestrak format
    https://rhodesmill.org/skyfield/earth-satellites.html
    """
    with load.open(file_name) as f:
        data = json.load(f)

    ts = load.timescale()
    satellite_list = [EarthSatellite.from_omm(ts, fields) for fields in data]
    return satellite_list

# Retrieve data
od_data = load_json_data("od_data.json")

Epoch (UTC): 2024-05-06T19:53:05Z
Altitude (km): 418.17695676229323
Speed (km/s): 7.659222658285209


In [None]:
class Transaction():
    """
    Transaction containing information to be submitted in the Distributed Ledger.
    Consensus needs to be reached on the validity of the information received.
    """
    def __init__(self, sender_address, recipient_address, sender_private_key, timestamp, tx_data) -> None:
        self.sender_address: int = sender_address
        self.recipient_address: int = recipient_address
        self.sender_private_key: str = sender_private_key
        self.timestamp: str = timestamp
        # TODO Add cooperative OD information to TLE data json
        # tx_data will include cooperative OD data, so including the ID 
        # and trajectory info for a witnessed satellite as well.
        self.tx_data: str = tx_data
        
        self.parent_hashes: list[str] = []
        self.confirmation_score: float = 0
        self.consensus_reached: bool = False
        self.is_confirmed: bool = False
        self.is_rejected: bool = False

        self.hash = self.calculate_hash()
        
    def calculate_hash(self) -> str:
        """
        Calculates a hash string for a transaction. The hash is generated by encoding all of
        the transaction's unique information, so that ANY change to a transaction results in a new 
        hash, making tampering easier to identify.
        """
        return hashlib.sha256(str(self.sender_address).encode() + self.timestamp.encode() + 
                              self.sender_private_key.encode() + self.tx_data.encode() + 
                              str(self.recipient_address).encode()
                             ).hexdigest()

In [None]:
class Node():
    """
    A class representing a node in the network, in this case a LEO satellite. 
    """
    def __init__(self, id: str) -> None:
        self.id: str = id
        # Reputation starts at 0, affected by validity and accuracy
        self.reputation: float = 0

In [None]:
class DAG():
    """
    A class representing the Directed Acyclic Graph (DAG) Distributed Ledger Technology.
    When a transaction is received, it is added to the DAG. The number of parents
    for each transaction is decided using a tip selection algorithm.
    """
    
    def __init__(self) -> None:
        # TODO - need to change from a chain with appends into a DAG with parents
        # TODO - need a way to check that the DAG has not been tampered with
        # TODO - could be a numpy array instead - maybe it should be for 2d
        self.chain: np.typing.ArrayLike = [self.create_genesis_tx()]
    
    def create_genesis_tx(self) -> Transaction:
        """
        Creates the genesis transaction to initialise the DAG and provide a parent
        for the first real transaction.
        """
        return Transaction(0, 0, "1234", "24/06/2025", "Genesis Transaction")
        
    def check_thresholds(self, transaction: Transaction) -> None:
        """
        Check if the transaction confirmation or rejection thresholds have been crossed. 
        This will affect weighting. is_confirmed = strong weighting, else weak weighting
        """
        if REJECTION_THRESHOLD <= transaction.confirmation_score <= CONFIRMATION_THRESHOLD:
            transaction.confirmation_score +=  CONFIRMATION_STEP
        else:
            transaction.confirmation_score -= CONFIRMATION_STEP
        
        if transaction.confirmation_score >= CONFIRMATION_THRESHOLD:
            transaction.is_confirmed = True
        elif transaction.confirmation_score <= REJECTION_THRESHOLD:
            transaction.is_rejected = True
    
    def get_latest_tx(self) -> Transaction:
        """
        Retrieves the most recently created transaction stored on the chain.
        TODO - may not be needed? For parent selection
        """
        return self.chain[len(self.chain) - 1]
    
    def get_second_latest_tx(self) -> Optional[Transaction]:
        """
        Retrieves the second-most recently created transaction stored on the chain.
        TODO - may not be needed? For parent selection
        """
        if len(self.chain) >= 2:
            return self.chain[len(self.chain) - 2]
        else:
            return None
    
    def add_tx(self, transaction: Transaction) -> None:
        """
        Add a transaction to the DAG
        """
        # TODO Just append for now, sort out tip selection later and number of parents
        # TODO - tx or blocks?? start with tx for now
        # TODO - fixed number of parents: 2
        parent1 = self.get_latest_tx()
        parent2 = self.get_second_latest_tx()
        
        # There is guaranteed to be one parent - the genesis transaction in the chain.
        # The first transaction will only have one parent. All others will have >= 2.
        transaction.parent_hashes.append(parent1)
        if parent2 is not None:
            transaction.parent_hashes.append(parent1)
            
        np.append(self.chain, transaction)
        # TODO - need to add consensus mechanism in here, may need to be a function within this class rather than a separate class to avoid circles
        self.check_thresholds(transaction)
        # THRESHOLDS affect tip selection for parents - TODO

In [None]:
class ConsensusMechanism():
    """
    The Proof of Inter-Satellite Evaluation (PoISE) consensus mechanism.
        
    TODO - need to check: if physically possible, how many times its been seen (maybe new param?)
    # Node needs an ID, trust factor number
    """
    # TODO - proof of Location XYO paper, and bidirectional heuristics (need to check for invalid, or valid but incorrect/inaccurate)(4.3, proof of proximity)
    # TODO - format for data? class for verification/consensus? Need some calcs
    # TODO TODAY - SLAM DRONE PAPER, and consensus on position, doppler shift?? what data so I have to choose from?
    # DAGmap - map consensus could be something to build upon? does it have a ground truth?
    # PowerGraph - consensus for trust level, calculates validity of transacion from probability level. I guess I need to calculate validity from some maths? possible - if yes, hen how accurate/likely
    # probability distruibution for observations? Algorithm one in PowerGraph paper
    
    def __init__(self):
        self.consensus_threshold : float = 1.0 # TODO - tune
        self.reputation_step: float = 0.1 # TODO - tune
    
    def data_is_valid(self, od_data: list) -> bool:
        """
        TODO Check if received data is valid, i.e. physically and logically possible
        Assume data comes in a standard TLE format from a Celestrak JSON
        Checks validity to reduce computation effort in consensus for data that is impossible
        """
        # If the list is empty, there is no data that can be valid
        if len(od_data) == 0:
            return False

        for sat in od_data:
            # To prevent all of SpaceX agreeing that they didn't see any satellites over Ukraine, nope, no way, they were in geostationary orbit above the North Pole            
            
            # Use sgp4 propogation to get altitude and velocity
            # More suitable than keplerian motion
            # The Two-Body-Keplerian orbit propagator is the less precise because 
            # it simplifies the gravitational field of the Earth as spherical and 
            # ignores all other sources of orbital perturbations. The SGP4 
            # orbit propagator enhances the accuracy by considering the 
            # oblateness of the Earth and the effects of atmospheric drag.
            epoch = sat.epoch

            state_vector = sat.at(epoch) # works in ITRF frame
            position = state_vector.position.km
            velocity = state_vector.velocity.km_per_s

            altitude_km = np.linalg.norm(position) - R
            speed_kmps = np.linalg.norm(velocity)

            print("Epoch (UTC):", epoch.utc_iso())
            print("Altitude (km):", altitude)
            print("Speed (km/s):", speed)

            # Check TLE formatting
            if not twoline2rv(x, y, wgs72): # TODO - might need to look at skyfield, verify epoch and speed and ID?? what else?
                return False
            
            # For LEO, satellites should have an inclination between 0 and 180 degrees
            # Inclination is initially provided in radians
            if not (0 <= (sat.inclo * 180 / np.pi) <= 180):
                return False
            
            # Eccenricity should be between 0 and 1
            if not (0 <= sat.ecco <= 1):
                return False
            
            # Altitude should be in LEO range of 200km to 2000km above the Earth's surface
            if not (200 <= altitude_km <= 2000):
                return False

        return True
    
    @staticmethod
    def proof_of_inter_satellite_evaluation(self, dag: DAG, node: Node, transaction: Transaction) -> bool:
        """
        Returns a bool of it consensus has been reached     
        """
        # 1)  Get the data 
        od_data = load_json_data("od_data.json")
        
        # 2) TODO Check if data is valid, if not - ignore. Consensus cannot be reached on invalid data. If yes, add to DAG
        if self.data_is_valid(od_data):
            pass
        else:
            # Reduce node reputation for providing invalid data
            node.reputation -= self.reputation_step
            return False
        
        # 3) TODO Check we have enough data to be bft (3f + 1)
        # if not, consensus cannot be reached. 
        if len(dag.chain) < 4:
            return False
        
        # 4) TODO Check if satellite has been witnessed before
        #4a if yes, does this data agree with other data/ is it correct? Assign correctness score -> affects transaction
        #  This is going to be very tricky. How do I get this data?? Where do I store it? Do I want this to tie in to how parents are selected?
        
        # 5) TODO is sensor data accurate (done regardless of previous witnessing). Assign accuracy score -> affects transaction and node reputation
        # Again, might be tricky. Probability distribution here? Like in the PowerGraph paper?
        
        # 6) TODO calculate consensus score - node reputation, accuracy and correctness all factor
        # Need to develop an equation - this will take some reading and tuning
        
        # 7) TODO if consensus score above threshold, consensus reached. Else not.