In [None]:
!pip install skyfield



In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import LeakyReLU
from skyfield.api import load, EarthSatellite

In [None]:
# load TLE data
def load_tle(tle_file_path):
    satellites = {}
    with open(tle_file_path, 'r') as file:
        lines = file.readlines()
        for i in range(0, len(lines), 3):
            name = lines[i].strip()
            line1 = lines[i + 1].strip()
            line2 = lines[i + 2].strip()
            satellite = EarthSatellite(line1, line2, name)
            satellites[name] = satellite
    return satellites

tle_file_path = 'active.txt'
satellites = load_tle(tle_file_path)

In [None]:
# # feature engineering
# def calculate_features(satellite1, satellite2, time):
#   position1 = satellite1.at(time).position.km
#   position2 = satellite2.at(time).position.km
#   velocity1 = satellite1.at(time).velocity.km_per_s
#   velocity2 = satellite2.at(time).velocity.km_per_s

#   distance = np.linalg.norm(position1 - position2)  # positional uncertainty
#   relative_velocity = np.linalg.norm(velocity1 - velocity2)  # relative velocity
#   inclination1 = satellite1.model.inclo  # orbital inclination (degrees)
#   inclination2 = satellite2.model.inclo
#   eccentricity1 = satellite1.model.ecco  # orbital eccentricity
#   eccentricity2 = satellite2.model.ecco

#   return [distance, relative_velocity, inclination1, inclination2, eccentricity1, eccentricity2]

In [None]:
# generate sequential trajectory data
def generate_trajectory_data(satellite1, satellite2, timesteps=10):
    ts = load.timescale()
    start_time = ts.now()

    trajectories = []
    for t in range(timesteps):
      time = start_time + t*60 # increment time by 1 min

      try:
        position1 = satellite1.at(time).position.km
        position2 = satellite2.at(time).position.km
        velocity1 = satellite1.at(time).velocity.km_per_s
        velocity2 = satellite2.at(time).velocity.km_per_s

        # features
        distance = np.linalg.norm(position1 - position2)  # positional uncertainty
        relative_velocity = np.linalg.norm(velocity1 - velocity2)  # relative velocity

        inclination1 = satellite1.model.inclo if hasattr(satellite1.model, 'inclo') else 0
        inclination2 = satellite2.model.inclo if hasattr(satellite2.model, 'inclo') else 0
        eccentricity1 = satellite1.model.ecco if hasattr(satellite1.model, 'ecco') else 0
        eccentricity2 = satellite2.model.ecco if hasattr(satellite2.model, 'ecco') else 0


        # check for NaNs
        if any(np.isnan([distance, relative_velocity, inclination1, inclination2, eccentricity1, eccentricity2])):
          print(f"Warning: NaN detected at timestep {t} for satellites {satellite1.name} & {satellite2.name}")
          return None

        trajectories.append([
            distance, relative_velocity, inclination1, inclination2, eccentricity1, eccentricity2
        ])

      except Exception as e:
        print(f"Error generating trajectory at timestep {t}: {e}")
        return None

    # ensure shape is always (timesteps, features)
    if len(trajectories) < timesteps:
      missing_timesteps = timesteps - len(trajectories)
      trajectories.extend([[0] * 6] * missing_timesteps)

    return np.array(trajectories)

In [None]:
# simulated dataset
def generate_dataset(satellites, num_samples=500, timesteps=10):
    satellite_names = list(satellites.keys())
    data=[]
    labels=[]

    for _ in range(num_samples):
        sat1 = satellites[np.random.choice(satellite_names)]
        sat2 = satellites[np.random.choice(satellite_names)]
        trajectory = generate_trajectory_data(sat1, sat2, timesteps)
        if trajectory is None: # skip bad data
          continue

        collision = np.random.choice([0, 1], p=[0.95, 0.05]) # simulated label

        data.append(trajectory)
        labels.append(collision)

    return np.array(data), np.array(labels)

# generate trajectory data
timesteps=10
X, y = generate_dataset(satellites, num_samples=500, timesteps=timesteps)
print(np.isnan(X).any())

False


In [None]:
# preprocess data
n_features = X.shape[2]
scalers = [StandardScaler() for _ in range(timesteps)]
for t in range(timesteps):
  if np.isnan(X[:, t, :]).any():
    print(f"Warning: NaN detected in timestep {t}")
  X[:, t, :] = scalers[t].fit_transform(X[:, t, :])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# check for NaN values in input
print("Any NaN in X_train:", np.isnan(X_train).any())
print("Any NaN in X_test:", np.isnan(X_test).any())
print("Any NaN in y_train:", np.isnan(y_train).any())
print("Any NaN in y_test:", np.isnan(y_test).any())

Any NaN in X_train: False
Any NaN in X_test: False
Any NaN in y_train: False
Any NaN in y_test: False


In [None]:
# define LSTM  with hyperparameter tuning
def build_lstm_model(timesteps, n_features):
    model = Sequential([
        LSTM(64, input_shape=(timesteps, n_features), return_sequences=True, activation='tanh'),
        Dropout(0.3),
        LSTM(32, activation='tanh'),
        Dropout(0.3),
        Dense(1, activation='sigmoid') # binary classification
    ])
    model.compile(optimizer=Adam(learning_rate=0.0001, clipnorm=1.0), loss='binary_crossentropy', metrics=['accuracy'])
    return model

model = build_lstm_model(timesteps, n_features)

history = model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=100, batch_size=32, verbose=1)

  super().__init__(**kwargs)


Epoch 1/100
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 99ms/step - accuracy: 0.5921 - loss: 0.6815 - val_accuracy: 0.9474 - val_loss: 0.6728
Epoch 2/100
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step - accuracy: 0.7457 - loss: 0.6716 - val_accuracy: 0.9474 - val_loss: 0.6583
Epoch 3/100
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step - accuracy: 0.8263 - loss: 0.6559 - val_accuracy: 0.9474 - val_loss: 0.6428
Epoch 4/100
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step - accuracy: 0.8430 - loss: 0.6477 - val_accuracy: 0.9474 - val_loss: 0.6265
Epoch 5/100
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step - accuracy: 0.8822 - loss: 0.6328 - val_accuracy: 0.9474 - val_loss: 0.6090
Epoch 6/100
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step - accuracy: 0.9180 - loss: 0.6108 - val_accuracy: 0.9474 - val_loss: 0.5897
Epoch 7/100
[1m12/12[0m [

In [None]:
def build_model(hp):
    model = Sequential()
    model.add(LSTM(
        units=hp.Int('units_1', min_value=32, max_value=128, step=16),
        input_shape=(timesteps, n_features),
        return_sequences=True,
        activation='relu'
    ))
    model.add(Dropout(hp.Float('dropout_1', min_value=0.1, max_value=0.5, step=0.1)))

    model.add(LSTM(
        units=hp.Int('units_2', min_value=32, max_value=128, step=16),
        activation='relu'
    ))
    model.add(Dropout(hp.Float('dropout_2', min_value=0.1, max_value=0.5, step=0.1)))


    model.add(Dense(1, activation='sigmoid'))


    model.compile(
        optimizer=Adam(learning_rate=hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    return model

In [None]:
pip install keras_tuner

Collecting keras_tuner
  Downloading keras_tuner-1.4.7-py3-none-any.whl.metadata (5.4 kB)
Collecting kt-legacy (from keras_tuner)
  Downloading kt_legacy-1.0.5-py3-none-any.whl.metadata (221 bytes)
Downloading keras_tuner-1.4.7-py3-none-any.whl (129 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m129.1/129.1 kB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading kt_legacy-1.0.5-py3-none-any.whl (9.6 kB)
Installing collected packages: kt-legacy, keras_tuner
Successfully installed keras_tuner-1.4.7 kt-legacy-1.0.5


In [None]:
# Hyperparameter tuning
import keras_tuner as kt

tuner = kt.RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=20,
    executions_per_trial=2,
    directory='lstm_hyperparameter_tuning',
    project_name='satellite_collision_prediction'
)

# Run hyperparameter search
tuner.search(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=32)

# Get the best hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

print(f"""
The optimal number of units in the first LSTM layer is {best_hps.get('units_1')},
the number of units in the second LSTM layer is {best_hps.get('units_2')},
the optimal dropout rates are {best_hps.get('dropout_1')} and {best_hps.get('dropout_2')},
and the best learning rate is {best_hps.get('learning_rate')}.
""")

Trial 20 Complete [00h 00m 26s]
val_accuracy: 0.9684210419654846

Best val_accuracy So Far: 0.9684210419654846
Total elapsed time: 00h 07m 17s

The optimal number of units in the first LSTM layer is 64,
the number of units in the second LSTM layer is 64,
the optimal dropout rates are 0.1 and 0.1,
and the best learning rate is 0.01.



In [None]:
# Use best hyperparameters found
final_model = tuner.hypermodel.build(best_hps)
final_model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=30, batch_size=32)

# Evaluate the model
loss, accuracy = final_model.evaluate(X_test, y_test)
print(f"\nFinal Model Test Accuracy: {accuracy * 100:.2f}%")

Epoch 1/30
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 64ms/step - accuracy: 0.8551 - loss: 0.4998 - val_accuracy: 0.9684 - val_loss: 0.3235
Epoch 2/30
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 19ms/step - accuracy: 0.9532 - loss: 0.3481 - val_accuracy: 0.9684 - val_loss: 0.1451
Epoch 3/30
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step - accuracy: 0.9392 - loss: 0.3178 - val_accuracy: 0.9684 - val_loss: 0.2056
Epoch 4/30
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step - accuracy: 0.9512 - loss: 0.2207 - val_accuracy: 0.9684 - val_loss: 0.1605
Epoch 5/30
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step - accuracy: 0.9581 - loss: 0.1645 - val_accuracy: 0.9684 - val_loss: 0.1632
Epoch 6/30
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step - accuracy: 0.9487 - loss: 0.1968 - val_accuracy: 0.9684 - val_loss: 0.1562
Epoch 7/30
[1m12/12[0m [32m━━━━

In [None]:
# Real-time collision prediction to predict risk for 2 satellites
sat1 = satellites[list(satellites.keys())[0]]
sat2 = satellites[list(satellites.keys())[1]]

trajectory = generate_trajectory_data(sat1, sat2, timesteps)
for t in range(timesteps):
    trajectory[t, :] = scalers[t].transform(trajectory[t, :].reshape(1, -1))

trajectory = np.expand_dims(trajectory, axis=0)  # Add batch dimension
collision_prob = model.predict(trajectory)[0][0]
print(f"\nPredicted Collision Probability: {collision_prob:.4f}")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 323ms/step

Predicted Collision Probability: 0.0108
