In [None]:
import os
import pandas as pd

# Define the base file path and template
base_path = "tle_data_first" 
file_template = "_{}.csv"

# Generate a list of file paths
file_paths = [os.path.join(base_path, file_template.format(i)) for i in range(1000, 59000, 1000)]

# Initialize an empty list to store DataFrames
dfs = []


for file_path in file_paths:
    file_path = file_path.replace('/', '')
    # Since we're joining paths, there's no need to remove '/' from file_path
    if os.path.exists(file_path) and os.path.getsize(file_path) > 0:  
        print(f"Reading file: {file_path}")  # Print the file path for debugging
        # Read the file with the correct delimiter and no header assumption
        df = pd.read_csv(file_path, delimiter=',', skiprows = 1, header=None, encoding='utf-8')
        # Ensure the DataFrame has the correct column names
        df.columns = ['NORAD_CAT_ID', 'TLE_LINE1', 'TLE_LINE2']
        dfs.append(df)

# Merge all non-empty DataFrames
if dfs:
    merged_df = pd.concat(dfs, ignore_index=True)
else:
    print("No valid data files found.")


In [None]:
# Extracting Timestamp from the TLE



import pandas as pd
import datetime

#lists to store the extracted data
exact_timestamps = []

# Select the number of batches based on the batch size
batch_size = 10000
total_rows = merged_df.shape[0]
num_batches = (total_rows // batch_size) + (1 if total_rows % batch_size != 0 else 0)

for batch_num in range(num_batches):
    start_index = batch_num * batch_size
    end_index = min((batch_num + 1) * batch_size, total_rows)
    print(f"Processing batch {batch_num + 1}/{num_batches}, Rows: {start_index} to {end_index}")

    
    for index in range(start_index, end_index):
        row = merged_df.iloc[index]

        # Extract year and full epoch day (including fractional day) from TLE line
        year_str = row['TLE_LINE1'][18:20].strip()
        epoch_day_str = row['TLE_LINE1'][20:32].strip()  
        
        # Convert to integers, adjusting for century
        year = int(year_str) + (1900 if int(year_str) >= 57 else 2000)

        # Split epoch day into day of year and fractional day
        try:
            day_of_year, frac_day_str = epoch_day_str.split('.')
            day_of_year = int(day_of_year)
            frac_day = float('0.' + frac_day_str)

            # Calculate the date from year and day of year
            date = datetime.datetime(year, 1, 1) + datetime.timedelta(days=day_of_year - 1)

            # Convert fractional day to hours, minutes, and seconds
            total_seconds = int(frac_day * 86400)
            hours = total_seconds // 3600
            minutes = (total_seconds % 3600) // 60
            seconds = total_seconds % 60

            # Combine date and time into a single datetime object
            exact_timestamp = datetime.datetime(year, date.month, date.day, hours, minutes, seconds)
            exact_timestamps.append(exact_timestamp)
        except ValueError as e:
            print(f"Error at index {index}: {e}, Line1: {row['TLE_LINE1']}")
            exact_timestamps.append(None)

# Add the extracted data to the DataFrame
merged_df['Exact_Timestamp'] = exact_timestamps
merged_df['Exact_Date'] = merged_df['Exact_Timestamp'].dt.date
merged_df['Exact_Time'] = merged_df['Exact_Timestamp'].dt.time


sorted_df = merged_df.sort_values(by=['NORAD_CAT_ID', 'Exact_Timestamp'])


print(sorted_df.head())


In [None]:
# Check for missing values
missing_data = sorted_df.isnull().sum()
print(missing_data[missing_data > 0])  


In [None]:
# Extracting Orbital Parameters


import pandas as pd


# Add a new column for flagging corrupted data
sorted_df['Corrupted'] = False

def extract_and_validate_tle_elements(row):
    # Extracting elements from TLE_LINE2 and initializing flag as False
    elements = {'inclination': float(row['TLE_LINE2'][8:16]),
                'raan': float(row['TLE_LINE2'][17:25]),
                'eccentricity': float('0.' + row['TLE_LINE2'][26:33]),
                'argument_of_perigee': float(row['TLE_LINE2'][34:42]),
                'mean_anomaly': float(row['TLE_LINE2'][43:51])}
    corrupted = False
    
    # Range checks with flagging
    if not 0 <= elements['inclination'] <= 180:
        corrupted = True
    if not 0 <= elements['raan'] <= 360:
        corrupted = True
    if not 0 <= elements['eccentricity'] < 1:
        corrupted = True
    if not 0 <= elements['argument_of_perigee'] <= 360:
        corrupted = True
    if not 0 <= elements['mean_anomaly'] <= 360:
        corrupted = True
    
    return corrupted

# flag corrupted entries
for index, row in sorted_df.iterrows():
    if extract_and_validate_tle_elements(row):
        sorted_df.at[index, 'Corrupted'] = True


corrupted_data = sorted_df[sorted_df['Corrupted'] == True]


In [None]:
def extract_mean_motion(tle_line2):
    # Extracting mean motion from TLE_LINE2
    return float(tle_line2[52:63].strip())

# extract mean motion for each row
sorted_df['mean_motion'] = sorted_df['TLE_LINE2'].apply(extract_mean_motion)


def extract_eccentricity(tle_line2):
    # Eccentricity is given starting from the 27th character in TLE_LINE2 for 7 characters
    # A leading decimal point is assumed, so it's added manually
    eccentricity_str = tle_line2[26:33].strip()
    eccentricity = float('0.' + eccentricity_str)
    return eccentricity

# extract eccentricity for each row and create the 'eccentricity' column
sorted_df['eccentricity'] = sorted_df['TLE_LINE2'].apply(extract_eccentricity)


import numpy as np

# Gravitational constant for Earth (mu = GM, in km^3/s^2)
mu_earth = 398600.4418  

def calculate_semi_major_axis(mean_motion):
    # Convert mean motion from revs per day to radians per second
    mean_motion_rad_s = mean_motion * 2 * np.pi / 86400
    # Calculate semi-major axis from mean motion
    a = (mu_earth / (mean_motion_rad_s ** 2)) ** (1/3)
    return a  # in kilometers

def calculate_orbital_period(semi_major_axis):
    # Calculate orbital period from semi-major axis
    T = 2 * np.pi * np.sqrt(semi_major_axis**3 / mu_earth)
    return T  # in seconds


sorted_df['Semi_Major_Axis'] = sorted_df['mean_motion'].apply(calculate_semi_major_axis)
sorted_df['Orbital_Period'] = sorted_df['Semi_Major_Axis'].apply(calculate_orbital_period)

def calculate_apogee_perigee(semi_major_axis, eccentricity):
    apogee = semi_major_axis * (1 + eccentricity) - 6371  # Earth's radius in km subtracted to get altitude
    perigee = semi_major_axis * (1 - eccentricity) - 6371
    return apogee, perigee


sorted_df[['Apogee', 'Perigee']] = sorted_df.apply(lambda row: calculate_apogee_perigee(row['Semi_Major_Axis'], row['eccentricity']), axis=1, result_type='expand')


from skyfield.api import load, EarthSatellite
import numpy as np

ts = load.timescale()

def extract_orbital_elements(tle_line1, tle_line2):
    satellite = EarthSatellite(tle_line1, tle_line2)
    
    inclination = satellite.model.inclo  # Inclination in radians
    raan = satellite.model.nodeo  # Right Ascension of Ascending Node in radians
    
    inclination_deg = np.degrees(inclination)
    raan_deg = np.degrees(raan)
    
    return inclination_deg, raan_deg

def add_additional_params(df):
    inclinations, raans = [], []
    
    for _, row in df.iterrows():
        inclination, raan = extract_orbital_elements(row['TLE_LINE1'], row['TLE_LINE2'])
        inclinations.append(inclination)
        raans.append(raan)
    
    df['Inclination'] = inclinations
    df['RAAN'] = raans
    return df


sorted_df = add_additional_params(sorted_df)



In [None]:
sorted_df.rename(columns={'Exact_Timestamp':'Timestamp','Exact_Date':'Epoch_Date','Exact_Time':'Epoch_Time','mean_motion':'Mean_Motion','eccentricity':'Eccentricity'}, inplace = True)

In [None]:
# Extract Latitude and Lngitude using SKyfield library


from skyfield.api import load, EarthSatellite
from datetime import datetime

ts = load.timescale()

def extract_lat_lon_alt(tle_line1, tle_line2, epoch_date, epoch_time):
    
    # Combine date and time into a single datetime object
    epoch_datetime = datetime.strptime(f'{epoch_date} {epoch_time}', '%Y-%m-%d %H:%M:%S')
    
    satellite = EarthSatellite(tle_line1, tle_line2)
    time = ts.utc(epoch_datetime.year, epoch_datetime.month, epoch_datetime.day, 
                  epoch_datetime.hour, epoch_datetime.minute, epoch_datetime.second)
    geocentric = satellite.at(time)
    subpoint = geocentric.subpoint()
    distance_from_center = geocentric.distance().km  # Distance from the Earth's center in kilometers
    altitude = distance_from_center - 6371  # Subtracting average Earth radius to get altitude above surface
    
    return subpoint.latitude.degrees, subpoint.longitude.degrees, altitude

def add_orbital_params(df, batch_size=10000):
    latitudes = []
    longitudes = []
    altitudes = []
    
    # Process data in batches
    for index in range(0, len(df), batch_size):
        batch_df = df.iloc[index:index+batch_size]
        for _, row in batch_df.iterrows():
            lat, lon, alt = extract_lat_lon_alt(row['TLE_LINE1'], row['TLE_LINE2'], 
                                                 row['Epoch_Date'], row['Epoch_Time'])
            latitudes.append(lat)
            longitudes.append(lon)
            altitudes.append(alt)
        print(f'Processed {index+batch_size} records')

    df['Latitude'] = latitudes
    df['Longitude'] = longitudes
    df['Altitude'] = altitudes
    return df


sorted_df = add_orbital_params(sorted_df)


In [None]:
# Filtering the data with each Norad ID having 1000 records


#Selecting only the 1000 records


filtered_df = sorted_df.groupby('NORAD_CAT_ID').filter(lambda x: len(x) >= 1000)

print(filtered_df.shape,filtered_df['NORAD_CAT_ID'].nunique())

filtered_df = filtered_df.groupby('NORAD_CAT_ID').head(1000)

In [None]:
# Adding additional Features


# Convert 'Timestamp' column to datetime 
filtered_df['Timestamp'] = pd.to_datetime(filtered_df['Timestamp'])

# Ensure the DataFrame is sorted by 'Timestamp' to correctly compute differences
filtered_df.sort_values(by=['NORAD_CAT_ID','Timestamp'], inplace=True)

# Calculate time differences between consecutive records in seconds
filtered_df['Time_Diffs'] = filtered_df['Timestamp'].diff().dt.total_seconds().fillna(0)

# Calculate changes in position (differences) for Latitude, Longitude, and Altitude
filtered_df['Lat_Diffs'] = filtered_df['Latitude'].diff().fillna(0)
filtered_df['Lon_Diffs'] = filtered_df['Longitude'].diff().fillna(0)
filtered_df['Alt_Diffs'] = filtered_df['Altitude'].diff().fillna(0)

# Compute velocity components by dividing position changes by time differences
filtered_df['Vel_Latitude'] = filtered_df['Lat_Diffs'] / filtered_df['Time_Diffs']
filtered_df['Vel_Longitude'] = filtered_df['Lon_Diffs'] / filtered_df['Time_Diffs']
filtered_df['Vel_Altitude'] = filtered_df['Alt_Diffs'] / filtered_df['Time_Diffs']

# Handle potential divide by zero or NaN issues from fillna(0)
filtered_df.replace([np.inf, -np.inf], np.nan, inplace=True)
filtered_df.fillna(0, inplace=True)  # Setting NaNs to 0; adjust based on your needs




In [None]:
def extract_bstar(tle_line):
    # Extract the BSTAR string from the TLE line
    bstar_str = tle_line[53:61].strip()

    # Check for the presence of an exponent sign (+ or -)
    if ' ' in bstar_str:
        # If there's a space, it's likely that the exponent part is missing
        # We'll use a default exponent of -4 in this case
        bstar_str = bstar_str.replace(' ', '0')  # Replace the space with '0'
        exponent = 'E-4'
    else:
        # If there's no space, the last two characters should represent the exponent
        exponent = 'E' + bstar_str[-2:]
        bstar_str = bstar_str[:-2]

    # Ensure the BSTAR string starts with a sign
    if bstar_str[0] not in ['+', '-']:
        bstar_str = '+' + bstar_str

    # Reconstruct the BSTAR value in scientific notation
    bstar_formatted = f"{bstar_str[0]}0.{bstar_str[1:]}{exponent}"

    return float(bstar_formatted)

def extract_tle_features(df):
    df['Arg_of_Perigee'] = df['TLE_LINE2'].str.slice(34, 42).astype(float)
    df['Mean_Anomaly'] = df['TLE_LINE2'].str.slice(43, 51).astype(float)
    df['BSTAR'] = df['TLE_LINE1'].apply(extract_bstar)
    df['Rev_at_Epoch'] = df['TLE_LINE2'].str.slice(63, 68).astype(float)
    return df


filtered_df = extract_tle_features(filtered_df)

filtered_df['Timestamp'] = pd.to_datetime(filtered_df['Timestamp'])
filtered_df.sort_values(by=['NORAD_CAT_ID', 'Timestamp'], inplace=True)

# Calculate the first derivative of Mean Motion
filtered_df['Mean_Motion_Dot'] = filtered_df.groupby('NORAD_CAT_ID')['Mean_Motion'].diff() / filtered_df.groupby('NORAD_CAT_ID')['Timestamp'].diff().dt.total_seconds()

# Calculate the second derivative of Mean Motion
filtered_df['Mean_Motion_Dot_Dot'] = filtered_df.groupby('NORAD_CAT_ID')['Mean_Motion_Dot'].diff() / filtered_df.groupby('NORAD_CAT_ID')['Timestamp'].diff().dt.total_seconds()



# Convert degrees to radians for cyclical transformation
filtered_df['RAAN_rad'] = np.radians(filtered_df['RAAN'])
filtered_df['Arg_of_Perigee_rad'] = np.radians(filtered_df['Arg_of_Perigee'])
filtered_df['Mean_Anomaly_rad'] = np.radians(filtered_df['Mean_Anomaly'])

# Cyclical transformations
filtered_df['RAAN_cos'] = np.cos(filtered_df['RAAN_rad'])
filtered_df['RAAN_sin'] = np.sin(filtered_df['RAAN_rad'])
filtered_df['Arg_of_Perigee_cos'] = np.cos(filtered_df['Arg_of_Perigee_rad'])
filtered_df['Arg_of_Perigee_sin'] = np.sin(filtered_df['Arg_of_Perigee_rad'])
filtered_df['Mean_Anomaly_cos'] = np.cos(filtered_df['Mean_Anomaly_rad'])
filtered_df['Mean_Anomaly_sin'] = np.sin(filtered_df['Mean_Anomaly_rad'])


def calculate_rate_of_change(df, feature_name):
    # Calculate the difference in the feature between consecutive rows
    df[feature_name + '_Diff'] = df.groupby('NORAD_CAT_ID')[feature_name].diff()
    
    # Calculate the difference in time between consecutive rows in hours
    df['Time_Diff_Hours'] = df.groupby('NORAD_CAT_ID')['Timestamp'].diff().dt.total_seconds() / 3600
    
    # Calculate the rate of change as feature difference divided by time difference
    df[feature_name + '_Rate_of_Change'] = df[feature_name + '_Diff'] / df['Time_Diff_Hours']
    
    # Fill NaN values with 0 for the rate of change
    df[feature_name + '_Rate_of_Change'] = df[feature_name + '_Rate_of_Change'].fillna(0)
    
    # Clean up by dropping the intermediate calculation columns
    df.drop(columns=[feature_name + '_Diff', 'Time_Diff_Hours'], inplace=True)
    
    return df

# Features to calculate the rate of change for
features_to_calculate = [
    'Altitude', 'Mean_Motion', 'Eccentricity', 'Vel_Latitude', 'Vel_Longitude', 'Vel_Altitude'
]

# Calculate the rate of change for each feature in the list
for feature in features_to_calculate:
    filtered_df = calculate_rate_of_change(filtered_df, feature)


filtered_df.head()


import pandas as pd

# Convert 'Timestamp' to datetime if it's not already
filtered_df['Timestamp'] = pd.to_datetime(filtered_df['Timestamp'])

# Now that 'Timestamp' is a datetime object, you can access .dt attributes
filtered_df['Day_of_Year'] = filtered_df['Timestamp'].dt.dayofyear

# Cyclical transformation for day of the year
filtered_df['Day_of_Year_sin'] = np.sin(2 * np.pi * filtered_df['Day_of_Year'] / 365)
filtered_df['Day_of_Year_cos'] = np.cos(2 * np.pi * filtered_df['Day_of_Year'] / 365)


filtered_df.replace([np.inf, -np.inf], np.nan, inplace=True)
filtered_df.fillna(0,inplace=True)

In [None]:
filtered_df.columns

## GDBT 

In [None]:
# Splitting each Norad id into train,val and test


import pandas as pd
from sklearn.model_selection import train_test_split

# Initialize empty lists to store DataFrames for training, validation, and test sets
train_dfs = []
val_dfs = []
test_dfs = []

# Iterate over each unique NORAD ID
for norad_id in filtered_df['NORAD_CAT_ID'].unique():
    # Filter the DataFrame for the current NORAD ID
    norad_df = filtered_df[filtered_df['NORAD_CAT_ID'] == norad_id]
    
    # Split the data for this NORAD ID into training and temp sets (temp combines validation and test)
    train_temp, temp = train_test_split(norad_df, test_size=0.4, random_state=42)
    
    # Split the temp set into validation and test sets
    val, test = train_test_split(temp, test_size=0.5, random_state=42)
    
    
    train_dfs.append(train_temp)
    val_dfs.append(val)
    test_dfs.append(test)

# Concatenate the lists of DataFrames to form final training, validation, and test DataFrames
final_train_df = pd.concat(train_dfs, ignore_index=True)
val_df = pd.concat(val_dfs, ignore_index=True)
test_df = pd.concat(test_dfs, ignore_index=True)



In [None]:
# Converting 'Epoch_Date' to ordinal number

final_train_df['Epoch_Date'] = final_train_df['Epoch_Date'].apply(lambda x: x.toordinal())
val_df['Epoch_Date'] = val_df['Epoch_Date'].apply(lambda x: x.toordinal())
test_df['Epoch_Date'] = test_df['Epoch_Date'].apply(lambda x: x.toordinal())

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler




# Selecting features and targets
feature_columns = ['Mean_Motion', 'Eccentricity', 'Semi_Major_Axis', 'Orbital_Period', 
                   'Inclination', 'RAAN', 'Arg_of_Perigee', 'Mean_Motion_Dot', 'Mean_Motion_Dot_Dot',
                   'RAAN_rad', 'Arg_of_Perigee_rad', 'Mean_Anomaly_rad', 'RAAN_cos', 'RAAN_sin',
                   'Arg_of_Perigee_cos', 'Arg_of_Perigee_sin', 'Mean_Anomaly_cos', 'Mean_Anomaly_sin',
                   'Altitude_Rate_of_Change', 'Mean_Motion_Rate_of_Change', 'Eccentricity_Rate_of_Change',
                   'Vel_Latitude_Rate_of_Change', 'Vel_Longitude_Rate_of_Change', 'Vel_Altitude_Rate_of_Change',
                   'Day_of_Year_sin', 'Day_of_Year_cos']


# Replace infinite values with NaN
final_train_df.replace([np.inf, -np.inf], np.nan, inplace=True)
val_df.replace([np.inf, -np.inf], np.nan, inplace=True)
test_df.replace([np.inf, -np.inf], np.nan, inplace=True)

# Impute NaN values with the median of the column
for column in feature_columns:
    median_value = final_train_df[column].median()
    final_train_df[column].fillna(median_value, inplace=True)
    val_df[column].fillna(median_value, inplace=True)
    test_df[column].fillna(median_value, inplace=True)

# Normalize feature columns
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(final_train_df[feature_columns])
X_val_scaled = scaler.transform(val_df[feature_columns])
X_test_scaled = scaler.transform(test_df[feature_columns])

# Define target columns
target_columns = ['Mean_Motion', 'Eccentricity', 'Inclination', 'RAAN', 'Arg_of_Perigee']

# Extract target values
Y_train = final_train_df[target_columns].values
Y_val = val_df[target_columns].values
Y_test = test_df[target_columns].values

In [None]:
# Make sure the shapes are right

print("Shape of X_train_scaled:", X_train_scaled.shape)
print("Shape of X_val_scaled:", X_val_scaled.shape)
print("Shape of X_test_scaled:", X_test_scaled.shape)



print("First 5 rows of X_train_scaled:")
print(X_train_scaled[:5])


print("\nFirst 5 rows of X_val_scaled:")
print(X_val_scaled[:5])


print("\nFirst 5 rows of X_test_scaled:")
print(X_test_scaled[:5])


In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.preprocessing import StandardScaler
import lightgbm as lgb
import numpy as np
import matplotlib.pyplot as plt


# Define your feature and target columns if not already done
feature_columns = ['Mean_Motion_Dot', 'Mean_Motion_Dot_Dot', 'RAAN_rad',
                   'Arg_of_Perigee_rad', 'Mean_Anomaly_rad', 'RAAN_cos', 'RAAN_sin',
                   'Arg_of_Perigee_cos', 'Arg_of_Perigee_sin', 'Mean_Anomaly_cos',
                   'Mean_Anomaly_sin', 'Altitude_Rate_of_Change', 'Mean_Motion_Rate_of_Change',
                   'Eccentricity_Rate_of_Change', 'Vel_Latitude_Rate_of_Change', 'Vel_Longitude_Rate_of_Change',
                   'Vel_Altitude_Rate_of_Change', 'Day_of_Year_sin', 'Day_of_Year_cos','Mean_Motion', 'Eccentricity', 'Inclination', 'RAAN', 'Arg_of_Perigee']

# Define target variables
target_columns = ['Latitude', 'Longitude', 'Altitude']

# Split the data into features and target
X = filtered_df[feature_columns]
y = filtered_df[target_columns]

# Split into train, validation, and test sets
X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.1, random_state=42)  # Test split
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.1, random_state=42)  # Validation split

# Normalize features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

final_mse,final_rmse,final_mae = 0,0,0

# Initialize the model
gbdt_model = lgb.LGBMRegressor()

# Train and evaluate the model for each target variable
for target in target_columns:
    # Train the model
    gbdt_model.fit(X_train_scaled, y_train[target])
    
    # Predict on the validation set
    y_val_pred = gbdt_model.predict(X_val_scaled)
    val_mse = mean_squared_error(y_val[target], y_val_pred)
    val_rmse = np.sqrt(val_mse)
    val_mae = mean_absolute_error(y_val[target], y_val_pred)
    print(f"Validation MSE: {val_mse}, RMSE: {val_rmse}, MAE: {val_mae}")

    # Predict on the test set
    y_test_pred = gbdt_model.predict(X_test_scaled)
    test_mse = mean_squared_error(y_test[target], y_test_pred)
    test_rmse = np.sqrt(test_mse)
    test_mae = mean_absolute_error(y_test[target], y_test_pred)

    # Aggregate final metrics
    final_mse += test_mse
    final_rmse += test_rmse
    final_mae += test_mae

    # Plotting Actual vs. Predicted Values for the validation set
    plt.figure(figsize=(10, 6))
    plt.scatter(y_val[target], y_val_pred, alpha=0.3)
    plt.plot([y_val[target].min(), y_val[target].max()], [y_val[target].min(), y_val[target].max()], 'k--', lw=2)
    plt.xlabel(f'Actual {target}')
    plt.ylabel(f'Predicted {target}')
    plt.title(f'Validation - Actual vs Predicted {target}')
    plt.show()

# Calculate the average final metrics
num_targets = len(target_columns)
avg_final_mse = final_mse / num_targets
avg_final_rmse = final_rmse / num_targets
avg_final_mae = final_mae / num_targets

# Print the average final metrics
print(f"Average Final Metrics on Test Set:\nMSE: {avg_final_mse}\nRMSE: {avg_final_rmse}\nMAE: {avg_final_mae}")


## Kalman Filters

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
from filterpy.kalman import UnscentedKalmanFilter, MerweScaledSigmaPoints
import matplotlib.pyplot as plt
from scipy.stats import gaussian_kde, zscore
import seaborn as sns


# Normalize input features
scaler = StandardScaler()
scaled_columns = scaler.fit_transform(filtered_df[['Latitude', 'Longitude', 'Altitude']])
filtered_df[['Latitude_scaled', 'Longitude_scaled', 'Altitude_scaled']] = scaled_columns

# Initialize Unscented Kalman Filter
initial_state_mean = np.zeros(6)
initial_state_covariance = np.eye(6)
sigma_points = MerweScaledSigmaPoints(n=6, alpha=1.0, beta=2.0, kappa=0.0)
ukf = UnscentedKalmanFilter(dim_x=6, dim_z=3, dt=1., fx=lambda x, dt: np.dot(np.eye(6), x), hx=lambda x: x[:3], points=sigma_points)
ukf.x = initial_state_mean
ukf.P = initial_state_covariance

# Collect all predictions and actual measurements
all_predicted_states = []
all_actual_measurements = []

for norad_id, data in filtered_df.groupby('NORAD_CAT_ID'):
    predicted_states = []
    actual_measurements = []

    for _, row in data.iterrows():
        ukf.predict()
        ukf.update(row[['Latitude_scaled', 'Longitude_scaled', 'Altitude_scaled']].to_numpy())
        estimated_state = ukf.x[:3]
        actual_state = row[['Latitude_scaled', 'Longitude_scaled', 'Altitude_scaled']].to_numpy()
        predicted_states.append(estimated_state)
        actual_measurements.append(actual_state)

    all_predicted_states.extend(predicted_states)
    all_actual_measurements.extend(actual_measurements)

# Convert lists to numpy arrays for easier manipulation
all_predicted_array = np.array(all_predicted_states)
all_actual_array = np.array(all_actual_measurements)

# Inverse transform the predictions and actuals back to their original scale
all_predicted_array = scaler.inverse_transform(all_predicted_array)
all_actual_array = scaler.inverse_transform(all_actual_array)

# Calculate evaluation metrics on the inverse transformed data
mse = mean_squared_error(all_actual_array, all_predicted_array, multioutput='raw_values')
rmse = np.sqrt(mse)
mae = mean_absolute_error(all_actual_array, all_predicted_array, multioutput='raw_values')

# Print evaluation metrics
print("Evaluation Metrics on Inverse Transformed Data:")
print("MSE (Latitude, Longitude, Altitude):", mse)
print("RMSE (Latitude, Longitude, Altitude):", rmse)
print("MAE (Latitude, Longitude, Altitude):", mae)

# Plot actual vs predicted values using the inverse transformed data
plt.figure(figsize=(18, 6))
titles = ['Latitude', 'Longitude', 'Altitude']
colors = ['blue', 'green', 'red']
for i in range(3):
    plt.subplot(1, 3, i+1)
    sns.scatterplot(x=all_actual_array[:, i], y=all_predicted_array[:, i], alpha=0.5, color=colors[i])
    plt.plot([all_actual_array[:, i].min(), all_actual_array[:, i].max()], [all_actual_array[:, i].min(), all_actual_array[:, i].max()], 'r--', color='black')
    plt.title(f'Actual vs Predicted {titles[i]}')
    plt.xlabel(f'Actual {titles[i]}')
    plt.ylabel(f'Predicted {titles[i]}')
plt.tight_layout()
plt.show()


## LSTM

In [None]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.layers import LSTM, Dropout, BatchNormalization, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.regularizers import l1
from keras.layers import Bidirectional, Attention
from keras.layers import GlobalAveragePooling1D

# Define features and targets
features_list = ['Mean_Motion', 'Eccentricity','Semi_Major_Axis', 'Orbital_Period', 'Apogee', 'Perigee', 'Inclination','RAAN']
features_list += ['Epoch_Year', 'Epoch_Day', 'hour_sin', 'hour_cos', 'day_of_week_sin', 'day_of_week_cos']
targets = ['Latitude', 'Longitude']

# Split data into training, validation, and testing sets
train_end = int(0.7 * len(norad_data))
val_end = train_end + int(0.15 * len(norad_data))

train_df = norad_data.iloc[:train_end]
val_df = norad_data.iloc[train_end:val_end]
test_df = norad_data.iloc[val_end:]

# Scale features and targets
feature_scaler = StandardScaler()
target_scaler = StandardScaler()

train_features = feature_scaler.fit_transform(train_df[features_list])
val_features = feature_scaler.transform(val_df[features_list])
test_features = feature_scaler.transform(test_df[features_list])

train_targets = target_scaler.fit_transform(train_df[targets])
val_targets = target_scaler.transform(val_df[targets])
test_targets = target_scaler.transform(test_df[targets])

# Reshape features for LSTM input
train_features = train_features.reshape((train_features.shape[0], 1, train_features.shape[1]))
val_features = val_features.reshape((val_features.shape[0], 1, val_features.shape[1]))
test_features = test_features.reshape((test_features.shape[0], 1, test_features.shape[1]))

features_input = Input(shape=(1, 14))

lstm_out = Bidirectional(LSTM(100, return_sequences=True, kernel_regularizer=l2(0.5)))(features_input)
lstm_out = Dropout(0.5)(lstm_out)
lstm_out = BatchNormalization()(lstm_out)

lstm_out = Bidirectional(LSTM(100, return_sequences=True, kernel_regularizer=l2(0.5)))(lstm_out)
lstm_out = Dropout(0.5)(lstm_out)
lstm_out = BatchNormalization()(lstm_out)

lstm_out = Bidirectional(LSTM(100, return_sequences=True, kernel_regularizer=l2(0.5)))(lstm_out)
lstm_out = Dropout(0.5)(lstm_out)
lstm_out = BatchNormalization()(lstm_out)

lstm_out = Bidirectional(LSTM(100, kernel_regularizer=l2(0.5)))(lstm_out)
lstm_out = Dropout(0.5)(lstm_out)
lstm_out = BatchNormalization()(lstm_out)


lstm_out = Dense(128, activation='relu')(lstm_out)
lstm_out = Dropout(0.5)(lstm_out)
lstm_out = BatchNormalization()(lstm_out)

output = Dense(2, activation='linear')(lstm_out)

model3 = Model(inputs=features_input, outputs=output)

# Compile model
model3.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])

# Define callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=3, min_lr=1e-6, verbose=1)

# Train model
history = model3.fit(
    train_features,
    train_targets,
    epochs=20,
    validation_data=(val_features, val_targets),
    callbacks=[early_stopping, reduce_lr],
    batch_size=64,
    verbose=1
)

# model summary
model3.summary()


# 1. Plot training and validation loss
plt.figure(figsize=(10, 5))
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epochs')
plt.legend(loc='upper right')
plt.show()