<a href="https://colab.research.google.com/github/hajira404/Collision-sim/blob/main/Collision_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install sgp4



In [None]:
debris_url = "https://celestrak.org/NORAD/elements/gp.php?GROUP=fengyun-1c-debris&FORMAT=tle"
sat_url = "https://celestrak.org/NORAD/elements/gp.php?GROUP=active&FORMAT=tle"

import requests

def fetch_tle(url):
    response = requests.get(url)
    lines = response.text.strip().splitlines()
    tle_data = []
    for i in range(0, len(lines)-2, 3):
        name = lines[i].strip()
        line1 = lines[i+1].strip()
        line2 = lines[i+2].strip()
        tle_data.append((name, line1, line2))
    return tle_data

# Fetch active satellites and debris
sat_tles = fetch_tle(sat_url)[:500]
debris_tles = fetch_tle(debris_url)[:1931]
print("Satellites loaded:", len(sat_tles))
print("Debris loaded:", len(debris_tles))



Satellites loaded: 500
Debris loaded: 1903


In [None]:
from sgp4.api import Satrec, jday
from datetime import datetime, timedelta

def get_positions(tle_list, hours=24, step=60):
    positions = {}
    now = datetime.utcnow()
    time_points = [now + timedelta(minutes=step*i) for i in range(hours)]

    for name, line1, line2 in tle_list:
        sat = Satrec.twoline2rv(line1, line2)
        sat_positions = []
        for t in time_points:
            jd, fr = jday(t.year, t.month, t.day, t.hour, t.minute, t.second)
            e, r, v = sat.sgp4(jd, fr)
            if e == 0:
                sat_positions.append((t, r))  # r = (x, y, z)
        positions[name] = sat_positions
    return positions
sat_positions = get_positions(sat_tles)
debris_positions = get_positions(debris_tles)
print("Total satellites with valid positions:", len(sat_positions))
print("Total debris with valid positions:", len(debris_positions))



Total satellites with valid positions: 500
Total debris with valid positions: 2


In [None]:
from math import sqrt
def compute_distances(sat_positions, debris_positions):
    all_distances = []

    for t_idx in range(len(list(sat_positions.values())[0])):
        for sat_name, sat_traj in sat_positions.items():
            for deb_name, deb_traj in debris_positions.items():
                try:
                    t_s, (x1, y1, z1) = sat_traj[t_idx]
                    t_d, (x2, y2, z2) = deb_traj[t_idx]

                    if abs((t_s - t_d).total_seconds()) < 1:
                        dist = sqrt((x1 - x2)**2 + (y1 - y2)**2 + (z1 - z2)**2)
                        label = 1 if dist < 500 else 0
                        all_distances.append({
                            "timestamp": t_s,
                            "satellite": sat_name,
                            "debris": deb_name,
                            "sat_x": x1,
                            "sat_y": y1,
                            "sat_z": z1,
                            "deb_x": x2,
                            "deb_y": y2,
                            "deb_z": z2,
                            "distance_km": dist,
                            "label": label
                        })
                except Exception as e:
                    print(f"Error at index {t_idx}: {e}")

    return pd.DataFrame(all_distances)


In [None]:
import pandas as pd

df = compute_distances(sat_positions, debris_positions)
df.head()



Unnamed: 0,timestamp,satellite,debris,sat_x,sat_y,sat_z,deb_x,deb_y,deb_z,distance_km,label
0,2025-07-08 07:11:07.329389,CALSPHERE 1,FENGYUN 1C,1620.376867,3249.10665,6409.63571,-1831.307711,-6816.825158,-1417.158238,13209.686335,0
1,2025-07-08 07:11:07.329389,CALSPHERE 1,FENGYUN 1C DEB,1620.376867,3249.10665,6409.63571,-3125.267416,-1162.702874,6346.78365,6479.903804,0
2,2025-07-08 07:11:07.329389,CALSPHERE 2,FENGYUN 1C,-1635.06158,-3928.934792,-6112.583069,-1831.307711,-6816.825158,-1417.158238,5515.92582,0
3,2025-07-08 07:11:07.329389,CALSPHERE 2,FENGYUN 1C DEB,-1635.06158,-3928.934792,-6112.583069,-3125.267416,-1162.702874,6346.78365,12849.458023,0
4,2025-07-08 07:11:07.329389,LCS 1,FENGYUN 1C,-4095.044477,7893.40697,-2260.447029,-1831.307711,-6816.825158,-1417.158238,14907.265658,0


In [None]:
# Add relative coordinates (feature engineering)
df["rel_x"] = df["sat_x"] - df["deb_x"]
df["rel_y"] = df["sat_y"] - df["deb_y"]
df["rel_z"] = df["sat_z"] - df["deb_z"]

# Optional: scale distance_km or relative positions
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
df_scaled = df.copy()
df_scaled[["distance_km", "rel_x", "rel_y", "rel_z"]] = scaler.fit_transform(
    df[["distance_km", "rel_x", "rel_y", "rel_z"]])


In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

df_balanced = pd.concat([
    df[df["label"] == 0].sample(100),
    df[df["label"] == 1]
])

print(df["label"].value_counts())
X = df_scaled[["rel_x", "rel_y", "rel_z", "distance_km"]]
y = df_scaled["label"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

model = RandomForestClassifier()
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))


label
0    23985
1       15
Name: count, dtype: int64
              precision    recall  f1-score   support

           0       1.00      1.00      1.00      7196
           1       1.00      0.75      0.86         4

    accuracy                           1.00      7200
   macro avg       1.00      0.88      0.93      7200
weighted avg       1.00      1.00      1.00      7200



In [None]:
df_balanced.to_csv("collision_dataset_500km.csv", index=False)
import pickle
from sklearn.pipeline import Pipeline
df = pd.read_csv("collision_dataset_500km.csv")

X = df[["rel_x", "rel_y", "rel_z", "distance_km"]]
y = df["label"]

# Build the pipeline
pipeline = Pipeline([
    ("scaler", StandardScaler()),
    ("classifier", RandomForestClassifier())
])

# Fit the pipeline
pipeline.fit(X, y)

# Save the pipeline
with open("collision_pipeline.pkl", "wb") as f:
    pickle.dump(pipeline, f)
