# Heuristic baseline

This jupyter notebook will guide the reader on implementing the Satellite Node Identification and Classification Tool (SNICT) as a heuristic method to identify and classify satellite nodes.

## We will first begin by importing the required packages.

In [1]:
import pandas as pd
import numpy as np
import os
from node import Node
from datetime import datetime, timedelta
from pathlib import Path

Here we will set up the input and output paths

In [3]:
DATA_DIR = Path('~/data/splid_challenge_data/train').expanduser()
OUTPUT_FILEPATH = Path('~/data/submission/submission.csv').expanduser()

For this work, we will be using custom classes to store node information.

In [4]:
class index_dict:
    def __init__(self):
        self.times = self.IDADIK()
        self.indices = []
        self.AD_dex =[]
        self.modes = self.mode()

    class IDADIK:
        def __init__(self):
            self.ID = []
            self.AD = []
            self.IK = []
    
    class mode:
        def __init__(self):
            self.SK = []
            self.end = []

detected = index_dict()
filtered = index_dict()

We will then search for the training data in the `dataset/train` folder and return a sorted list for the filepaths.

In [6]:
datalist = []

# Searching for training data within the dataset folder
for file in os.listdir(DATA_DIR):
    if file.endswith(".csv"):
        datalist.append(os.path.join("../../dataset/train/", file))

# Sort the training data and labels
datalist = sorted(datalist)

# Print the sorted filepath to the training data
print(datalist)

['../../dataset/train/10.csv', '../../dataset/train/11.csv', '../../dataset/train/12.csv', '../../dataset/train/13.csv', '../../dataset/train/14.csv', '../../dataset/train/15.csv', '../../dataset/train/16.csv', '../../dataset/train/17.csv', '../../dataset/train/18.csv', '../../dataset/train/19.csv', '../../dataset/train/2.csv', '../../dataset/train/3.csv', '../../dataset/train/4.csv', '../../dataset/train/5.csv', '../../dataset/train/6.csv', '../../dataset/train/7.csv', '../../dataset/train/8.csv', '../../dataset/train/9.csv']


Next, we will be reading the training data as a pandas dataframe.

In [12]:
data_paths = Path(DATA_DIR).glob('*.csv')
# Check if test_data is empty
if not data_paths:
    raise ValueError(f'No csv files found in {DATA_DIR}')
data = pd.DataFrame()
for data_file in data_paths:
    data_df = pd.read_csv(data_file)
    data_df['ObjectID'] = int(data_file.stem)
    data_df['TimeIndex'] = range(len(data_df))
    data = pd.concat([data, data_df], ignore_index=True)
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 39096 entries, 0 to 39095
Data columns (total 18 columns):
 #   Column                       Non-Null Count  Dtype  
---  ------                       --------------  -----  
 0   Eccentricity                 39096 non-null  float64
 1   Semimajor Axis (m)           39096 non-null  float64
 2   Inclination (deg)            39096 non-null  float64
 3   RAAN (deg)                   39096 non-null  float64
 4   Argument of Periapsis (deg)  39096 non-null  float64
 5   Mean Anomaly (deg)           39096 non-null  float64
 6   True Anomaly (deg)           39096 non-null  float64
 7   Latitude (deg)               39096 non-null  float64
 8   Longitude (deg)              39096 non-null  float64
 9   Altitude (m)                 39096 non-null  float64
 10  X (m)                        39096 non-null  float64
 11  Y (m)                        39096 non-null  float64
 12  Z (m)                        39096 non-null  float64
 13  Vx (m/s)        

In [4]:
# Load the first training data. Note: Python starts counting from 0, instead of 1.
idx_data = 4

data_path = datalist[idx_data]
data = pd.read_csv(data_path)

The SNICT uses longitudinal and inclination information to detect and characterize the changes in a satellite's behavioral mode. The longitudinal an inclination information are extracted from the pandas dataframe.

In [5]:
# Read the objectID from the filename
satcat=data_path[-7:-4]

# Extracting longitudinal and inclination information from the pandas dataframe
longitudes = data["Longitude (deg)"]
inclinations = data["Inclination (deg)"]

# Arbitrary assign start time and end time. Note: SNICT was developed to read in time-stamped data, 
# however, our training data are not label with a time stamp, hence an arbitrary start and end time
# are selected
starttime = datetime.fromisoformat("2023-01-01 00:00:00+00:00")
endtime = datetime.fromisoformat("2023-07-01 00:00:00+00:00")

## Detection of East-West Pattern-of-Life Nodes

Identify possible East-West PoL nodes by analyzing the changes in longitudinal values.

In [6]:
# Get std for longitude over a 24 hours window
lon_std = []
nodes = []
steps_per_day = 12
lon_was_baseline = True
lon_baseline = 0.03

for i in range(len(data["Longitude (deg)"])):
    if i <= steps_per_day:
        lon_std.append(np.std(data["Longitude (deg)"][0:steps_per_day]))
    else:
        lon_std.append(np.std(data["Longitude (deg)"][i-steps_per_day:i]))

ssEW = Node(satcat=satcat,
            t=starttime,
            index=0,
            ntype="SS",
            signal="EW")
es = Node(satcat=satcat,
            t=endtime,
            index=len(data["Longitude (deg)"])-1,
            ntype="ES",
            signal="ES",
            mtype="ES")

# Run LS detection
for i in range(steps_per_day+1,len(lon_std)-steps_per_day):             # if at least 1 day has elapsed since t0
    max_lon_std_24h = np.max(lon_std[i-steps_per_day:i])
    min_lon_std_24h = np.min(lon_std[i-steps_per_day:i])
    A = np.abs(max_lon_std_24h-min_lon_std_24h)/2
    th_ = 0.95*A

    # ID detection
    if (lon_std[i]>lon_baseline) & lon_was_baseline:                    # if sd is elevated & last sd was at baseline
        before = np.mean(data["Longitude (deg)"][i-steps_per_day:i])    # mean of previous day's longitudes
        after = np.mean(data["Longitude (deg)"][i:i+steps_per_day])     # mean of next day's longitudes
        # if not temporary noise, then real ID
        if np.abs(before-after)>0.3:                                    # if means are different
            lon_was_baseline = False                                    # the sd is not yet back at baseline
            index = i
            if i < steps_per_day+2:
                ssEW.mtype = "NK"
            else:
                detected.times.ID.append(starttime+timedelta(hours=i*2))
    # IK detection
    elif (lon_std[i]<=lon_baseline) & (not lon_was_baseline):           # elif sd is not elevated and drift has already been initialized
        drift_ended = True                                              # toggle end-of-drift boolean 
        for j in range(steps_per_day):                                  # for the next day, check...
            if np.abs(data["Longitude (deg)"][i]-data["Longitude (deg)"][i+j])>0.3:       # if the longitude changes from the current value
                drift_ended = False                                     # the drift has not ended
        if drift_ended:                                                 # if the drift has ended
            lon_was_baseline = True                                     # the sd is back to baseline
            detected.times.IK.append(starttime+timedelta(hours=i*2))    # save tnow as end-of-drift
            detected.indices.append([index,i])                          # save indices for t-start & t-end

    # Last step
    elif (i == (len(lon_std)-steps_per_day-1))\
        &(not lon_was_baseline):
        detected.times.IK.append(starttime+timedelta(hours=i*2))
        detected.indices.append([index,i])
    
    # AD detection
    elif ((lon_std[i]-max_lon_std_24h>th_) or (min_lon_std_24h-lon_std[i]>th_)) & (not lon_was_baseline):          # elif sd is elevated and drift has already been initialized
        if i >= steps_per_day+3:
            detected.times.AD.append(starttime+timedelta(hours=i*2))
            detected.AD_dex.append(i)


Next, we will filter the East-West nodes and merge nearby nodes.

In [7]:
def add_node(n):
    nodes[len(nodes)-1].char_mode(
        next_index = n.index,
        lons = longitudes,
        incs = inclinations
    )
    if n.type == "AD":
        nodes[len(nodes)-1].mtype = "NK"

    if (nodes[len(nodes)-1].mtype != "NK"):
        filtered.indices.append([nodes[len(nodes)-1].index,n.index])
        filtered.modes.SK.append(nodes[len(nodes)-1].mtype)
        stop_NS = True if n.type == "ID" else False
        filtered.modes.end.append(stop_NS)
    nodes.append(n)

toggle = True
nodes.append(ssEW)
if len(detected.times.IK) == 1:
    if len(detected.times.ID) == 1:
        filtered.times.ID.append(detected.times.ID[0])                                  # keep the current ID
        ID = Node(satcat,
                detected.times.ID[0],
                index=detected.indices[0][0],
                ntype='ID',
                lon=longitudes[detected.indices[0][0]],
                signal="EW")
        add_node(ID)
    filtered.times.IK.append(detected.times.IK[0]) 
    IK = Node(satcat,
            detected.times.IK[0],
            index=detected.indices[0][1],
            ntype='IK',
            lon=longitudes[detected.indices[0][1]],
            signal="EW")
    apnd = True
    if len(detected.times.AD) == 1:
        AD = Node(satcat,
                  detected.times.AD[0],
                  index=detected.AD_dex[0],
                  ntype="AD",
                  signal="EW")
        add_node(AD)
    elif len(detected.times.AD) == 0:
        pass
    else:
        for j in range(len(detected.times.AD)):
            ad = Node(satcat,
                  detected.times.AD[j],
                  index=detected.AD_dex[j],
                  ntype="AD",
                  signal="EW")
            ad_next = Node(satcat,
                  detected.times.AD[j+1],
                  index=detected.AD_dex[j+1],
                  ntype="AD",
                  signal="EW") \
                if j < (len(detected.times.AD)-1) else None
            if (ad.t>starttime+timedelta(hours=detected.indices[0][0]*2))&(ad.t<IK.t):
                if apnd & (ad_next is not None):
                    if ((ad_next.t-ad.t)>timedelta(hours=24)):
                        add_node(ad)
                    else:
                        add_node(ad)
                        apnd = False
                elif apnd & (ad_next is None):
                    add_node(ad)
                elif (not apnd) & (ad_next is not None):
                    if ((ad_next.t-ad.t)>timedelta(hours=24)):
                        apnd = True
    if detected.indices[0][1] != (len(lon_std)-steps_per_day-1):
        add_node(IK)    

for i in range(len(detected.times.IK)-1):                                 # for each longitudinal shift detection
    if toggle:                                                            # if the last ID was not discarded
        if ((starttime+timedelta(hours=detected.indices[i+1][0]*2)-detected.times.IK[i])>timedelta(hours=36)):# if the time between the current IK & next ID is longer than 48 hours
            filtered.times.ID.append(detected.times.ID[i])                # keep the current ID
            filtered.times.IK.append(detected.times.IK[i])                # keep the current IK
            ID = Node(satcat,
                    detected.times.ID[i],
                    index=detected.indices[i][0],
                    ntype='ID',
                    lon=longitudes[detected.indices[i][0]],
                    signal="EW")
            add_node(ID)
            IK = Node(satcat,
                    detected.times.IK[i],
                    index=detected.indices[i][1],
                    ntype='IK',
                    lon=longitudes[detected.indices[i][1]],
                    signal="EW")
            apnd = True
            for j in range(len(detected.times.AD)):
                ad = Node(satcat,
                  detected.times.AD[j],
                  index=detected.AD_dex[j],
                  ntype="AD",
                  signal="EW")
                ad_next = Node(satcat,
                  detected.times.AD[j+1],
                  index=detected.AD_dex[j+1],
                  ntype="AD",
                  signal="EW") \
                    if j < (len(detected.times.AD)-1) else None
                if (ad.t>ID.t)&(ad.t<IK.t):
                    if apnd & (ad_next is not None):
                        if ((ad_next.t-ad.t)>timedelta(hours=24)):
                            add_node(ad)
                        else:
                            add_node(ad)
                            apnd = False
                    elif apnd & (ad_next is None):
                        add_node(ad)
                    elif (not apnd) & (ad_next is not None):
                        if ((ad_next.t-ad.t)>timedelta(hours=24)):
                            apnd = True
            if detected.indices[0][1] != (
                len(lon_std)-steps_per_day-1):
                add_node(IK)    
            if i == len(detected.times.IK)-2:                             # if the next drift is the last drift
                filtered.times.ID.append(starttime+timedelta(hours=detected.indices[i+1][0]*2))                    # keep the next ID
                ID = Node(satcat,
                        starttime+timedelta(hours=detected.indices[i+1][0]*2),
                        index=detected.indices[i+1][0],
                        ntype='ID',
                        lon=longitudes[detected.indices[i+1][0]],
                        signal="EW")
                add_node(ID)
                IK = Node(satcat,
                        detected.times.IK[i+1],
                        index=detected.indices[i+1][1],
                        ntype='IK',
                        lon=longitudes[detected.indices[i+1][1]],
                        signal="EW")
                apnd = True
                for j in range(len(detected.times.AD)):
                    ad = Node(satcat,
                        detected.times.AD[j],
                        index=detected.AD_dex[j],
                        ntype="AD",
                        signal="EW")
                    ad_next = Node(satcat,
                        detected.times.AD[j+1],
                        index=detected.AD_dex[j+1],
                        ntype="AD",
                        signal="EW") \
                        if j < (len(detected.times.AD)-1) else None
                    if (ad.t>ID.t)&(ad.t<IK.t):
                        if apnd & (ad_next is not None):
                            if ((ad_next.t-ad.t)>timedelta(
                                hours=24)):
                                add_node(ad)
                            else:
                                add_node(ad)
                                apnd = False
                        elif apnd & (ad_next is None):
                            add_node(ad)
                        elif (not apnd) & (ad_next is not None):
                            if ((ad_next.t-ad.t)>timedelta(
                                hours=24)):
                                apnd = True
                if detected.indices[i][1] != (
                    len(lon_std)-steps_per_day-1):
                    filtered.times.IK.append(detected.times.IK[i+1])      # keep the next IK
                    add_node(IK)
        else:                                                             # if the next ID and the current IK are 48 hours apart or less
            ID = Node(satcat,
                    detected.times.ID[i],
                    index=detected.indices[i][0],
                    ntype='ID',
                    lon=longitudes[detected.indices[i][0]],
                    signal="EW")                                          # keep the current ID
            add_node(ID)
            AD = Node(satcat,
                    detected.times.IK[i],
                    index=detected.indices[i][1],
                    ntype='AD',
                    lon=longitudes[detected.indices[i][1]],
                    signal="EW")                                          # change the current IK to an AD
            IK = Node(satcat,
                    detected.times.IK[i+1],
                    index=detected.indices[i+1][1],
                    ntype='IK',
                    lon=longitudes[detected.indices[i+1][1]],
                    signal="EW")                                          # exchange the current IK for the next one
            add_node(AD)
            apnd = True
            for j in range(len(detected.times.AD)):
                ad = Node(satcat,
                  detected.times.AD[j],
                  index=detected.AD_dex[j],
                  ntype="AD",
                  signal="EW")
                ad_next = Node(satcat,
                  detected.times.AD[j+1],
                  index=detected.AD_dex[j+1],
                  ntype="AD",
                  signal="EW") \
                    if j < (len(detected.times.AD)-1) else None
                if (ad.t>ID.t)&(ad.t<IK.t):
                    if apnd & (ad_next is not None):
                        if ((ad_next.t-ad.t)>timedelta(hours=24)):
                            add_node(ad)
                        else:
                            add_node(ad)
                            apnd = False
                    elif apnd & (ad_next is None):
                        add_node(ad)
                    elif (not apnd) & (ad_next is not None):
                        if ((ad_next.t-ad.t)>timedelta(hours=24)):
                            apnd = True
            if detected.indices[0][1] != (
                len(lon_std)-steps_per_day-1):
                add_node(IK)    
            filtered.times.ID.append(detected.times.ID[i])
            filtered.times.AD.append(detected.times.IK[i])
            filtered.times.IK.append(detected.times.IK[i+1])
            toggle = False                                                # skip the redundant drift
    else:
        toggle = True
add_node(es)

## Detection of North-South Pattern-of-Life Nodes

Identify possible North-South PoL nodes by analyzing the changes in inclination values.

In [8]:
ssNS = Node(
        satcat=satcat,
        t=starttime,
        index=0,
        ntype="SS",
        signal="NS")
for j in range(len(filtered.indices)):
    indices = filtered.indices[j]
    first = True if indices[0] == 0 else False
    times = []
    dexs = []
    inc = inclinations[indices[0]:indices[1]].to_numpy()
    t = np.arange(indices[0],indices[1])*2
    rate = (steps_per_day/(indices[1]-indices[0]))*(np.max(inc)-np.min(inc))
    XIPS_inc_per_day = 0.0005 #0.035/30
    if (rate < XIPS_inc_per_day) and (indices[0] < steps_per_day) and (indices[1] > steps_per_day):
        if filtered.modes.end[j]:
            nodes.append(Node(
                satcat=satcat,
                t=starttime+timedelta(hours=indices[1]*2),
                index=indices[1],
                ntype="ID",
                signal="NS",
                mtype="NK"
            ))
        ssNS.mtype = filtered.modes.SK[j]
    elif (rate < XIPS_inc_per_day):
        nodes.append(Node(
            satcat=satcat,
            t=times[0],
            index=dexs[0],
            ntype="IK",
            signal="NS",
            mtype=filtered.modes.SK[j]
        ))
        if filtered.modes.end[j]:
            nodes.append(Node(
                satcat=satcat,
                t=starttime+timedelta(hours=indices[1]*2),
                index=indices[1],
                ntype="ID",
                signal="NS",
                mtype="NK"
            ))
    else:
        dt = [0.0]
        for i in range(len(inc)-1):
            dt.append((inc[i+1]-inc[i])/(2*60*60))
        prev = 1.0
        for i in range(len(dt)-1):
            if np.abs(dt[i])> 5.5e-7:
                times.append(starttime+timedelta(hours=float(t[i])))
                dexs.append(i+indices[0])
                if (np.abs(np.mean(inc[0:i])-np.mean(inc[i:len(inc)]))/np.std(inc[0:i]))/prev < 1.0:
                    if first and len(times)==2:
                        ssNS.mtype = filtered.modes.SK[0]
                        first = False
                elif len(times)==2:
                    first = False
                prev = np.abs(np.mean(inc[0:i])-np.mean(inc[i:len(inc)]))/np.std(inc[0:i])
                
        if len(times)>0:
            nodes.append(Node(
                satcat=satcat,
                t=times[0],
                index=dexs[0],
                ntype="IK",
                signal="NS",
                mtype=filtered.modes.SK[j]
            ))
            if filtered.modes.end[j]:
                nodes.append(Node(
                    satcat=satcat,
                    t=starttime+timedelta(hours=indices[1]*2),
                    index=indices[1],
                    ntype="ID",
                    signal="NS",
                    mtype="NK"
                ))
        elif filtered.indices[0][0] == 0:
            ssNS.mtype = filtered.modes.SK[0]
        else:
            ssNS.mtype = "NK"
nodes.append(ssNS)
nodes.sort(key=lambda x: x.t)

## Data postprocessing

Lastly, we will tidy out the output of the SNICT algorithm to match the SPLID data format.

In [9]:
# Convert timestamp back into timeindex and format the output to the correct format in a pandas dataframe
ObjectID_list = []
TimeIndex_list = []
Direction_list = []
Node_list = []
Type_list = []
for i in range(len(nodes)):
    ObjectID_list.append(nodes[i].satcat)
    TimeIndex_list.append(int(((nodes[i].t-starttime).days*24+(nodes[i].t-starttime).seconds/3600)/2))
    Direction_list.append(nodes[i].signal)
    Node_list.append(nodes[i].type)
    Type_list.append(nodes[i].mtype)
    
# Initialize data of lists. 
data = {'ObjectID': ObjectID_list, 
        'TimeIndex': TimeIndex_list,
        'Direction': Direction_list, 
        'Node': Node_list,
        'Type': Type_list} 
  
# Create the pandas DataFrame 
prediction = pd.DataFrame(data) 

print(prediction)

   ObjectID  TimeIndex Direction Node Type
0         5          0        EW   SS   CK
1         5          0        NS   SS   CK
2         5        919        EW   ID   NK
3         5        930        EW   AD   NK
4         5       2172        ES   ES   ES


In [10]:
# Save the prediction into a csv file 
prediction.to_csv('prediction.csv', index=False)  