In [None]:
!pip install numba
!pip install sgp4

Import Required Libraries

In [None]:
import numpy as np
from sgp4.api import Satrec, WGS72
from sgp4.api import jday
import datetime
import pyproj
from shapely.geometry import Point, Polygon
from numba import cuda
import matplotlib.pyplot as plt
import math

Read TLE Data

In [None]:
def read_tle_file(file_path):
    """Read TLE data from the given file and return a list of tuples (name, line1, line2)."""
    with open(file_path, 'r') as file:
        lines = file.readlines()

    satellites = []
    for i in range(0, len(lines), 3):
        name = lines[i].strip()  # Satellite name
        line1 = lines[i + 1].strip()  # First TLE line
        line2 = lines[i + 2].strip()  # Second TLE line
        satellites.append((name, line1, line2))

    return satellites


Convert TLE Epoch to Julian Date

In [None]:
def tle_epoch_to_julian_date(epoch):
    """Convert TLE epoch to Julian Date."""
    year = int(epoch[:2])
    if year < 57:  # TLE epoch year format
        year += 2000
    else:
        year += 1900

    day_of_year = int(epoch[2:5])  # Day of the year
    fraction_of_day = float(epoch[5:])  # Fraction of the day

    # Convert year and day_of_year to a date
    date = datetime.datetime(year, 1, 1) + datetime.timedelta(days=day_of_year - 1)

    # Julian Date conversion
    julian_date = date.toordinal() + 1721424.5 + fraction_of_day

    return julian_date

def julian_date_to_datetime(jd):
    """Convert Julian Date to a datetime object."""
    jd = jd + 0.5
    F, I = np.modf(jd)
    I = int(I)

    A = I
    if I >= 2299161:
        alpha = int((I - 1867216.25) / 36524.25)
        A = I + 1 + alpha - int(alpha / 4)

    B = A + 1524
    C = int((B - 122.1) / 365.25)
    D = int(365.25 * C)
    E = int((B - D) / 30.6001)

    day = B - D - int(30.6001 * E) + F
    month = E - 1 if E < 14 else E - 13
    year = C - 4716 if month > 2 else C - 4715

    hours, minutes = divmod(day % 1 * 24, 1)
    minutes, seconds = divmod(minutes * 60, 1)
    seconds = round(seconds * 60, 2)

    return datetime.datetime(int(year), int(month), int(day)) + datetime.timedelta(
        hours=int(hours), minutes=int(minutes), seconds=seconds
    )


Get Satellite Positions

In [None]:
def get_satellite_positions(n, satrec, jd, fr, interval_minutes, cnt):
    """Retrieve satellite positions and velocities from TLE epoch (Julian Date)."""
    times = []
    positions = []
    velocities = []

    current_time = jd  # Start from the TLE epoch (Julian Date)
    end_time = jd + 1  # Process for one day (1 Julian day)

    while current_time <= end_time:
        try:
            e, r, v = satrec.sgp4(current_time, fr)  # Get position and velocity

            if e == 0:  # Success
                times.append(current_time)
                positions.append(r)
                velocities.append(v)
            else:
                print(f"{n} Error code {e} for JD {current_time}")
        except Exception as ex:
            print(f"Exception occurred for JD {current_time}: {ex}")

        # Increment current_time by the interval in fractional days
        current_time += interval_minutes / 1440  # Convert minutes to fractional days

    return times, positions, velocities


Read Filtered Positions from File

In [None]:
def read_filtered_positions(file_path):
    """Read the satellite positions from the file and return as a list of tuples (lon, lat, alt)."""
    filtered_positions = []
    with open(file_path, 'r') as file:
        lines = file.readlines()

    for line in lines:
        _, _, lon, lat, alt = line.strip().split(',')
        filtered_positions.append((float(lon), float(lat), float(alt)))

    return filtered_positions


CUDA Device Function for Point in Polygon

In [None]:
def is_within_polygon(lon, lat, polygon):
    """Check if a point (lon, lat) is inside the polygon."""
    point = Point(lon, lat)
    return polygon.contains(point)

@cuda.jit(device=True)
def point_in_polygon(x, y, polygon_x, polygon_y):
    """Determine if a point is inside a polygon using the ray-casting algorithm."""
    num_vertices = len(polygon_x)
    odd_nodes = False

    j = num_vertices - 1
    for i in range(num_vertices):
        if ((polygon_y[i] < y and polygon_y[j] >= y) or (polygon_y[j] < y and polygon_y[i] >= y)) and \
                (polygon_x[i] <= x or polygon_x[j] <= x):
            if polygon_x[i] + (y - polygon_y[i]) / (polygon_y[j] - polygon_y[i]) * (polygon_x[j] - polygon_x[i]) < x:
                odd_nodes = not odd_nodes
        j = i

    return odd_nodes


CUDA Kernel for Filtering Positions

In [None]:
@cuda.jit
def filter_positions_cuda(lons, lats, alts, polygon_x, polygon_y, filtered_lons, filtered_lats, filtered_alts, results_count):
    idx = cuda.grid(1)
    if idx < lons.size:
        if point_in_polygon(lons[idx], lats[idx], polygon_x, polygon_y):
            pos = cuda.atomic.add(results_count, 0, 1)
            filtered_lons[pos] = lons[idx]
            filtered_lats[pos] = lats[idx]
            filtered_alts[pos] = alts[idx]


def filter_satellite_positions_cuda(positions, polygon):
    """Filter positions to only include those within the polygon using CUDA."""
    lons = np.array([p[0] for p in positions], dtype=np.float32)
    lats = np.array([p[1] for p in positions], dtype=np.float32)
    alts = np.array([p[2] for p in positions], dtype=np.float32)

    filtered_lons = cuda.device_array_like(lons)
    filtered_lats = cuda.device_array_like(lats)
    filtered_alts = cuda.device_array_like(alts)

    results_count = cuda.to_device(np.zeros(1, dtype=np.int32))

    # Extract polygon coordinates
    polygon_x, polygon_y = polygon.exterior.xy
    polygon_x = np.array(polygon_x, dtype=np.float32)
    polygon_y = np.array(polygon_y, dtype=np.float32)

    # Transfer the polygon coordinates to the GPU
    polygon_x_device = cuda.to_device(polygon_x)
    polygon_y_device = cuda.to_device(polygon_y)

    threads_per_block = 256
    blocks_per_grid = (lons.size + threads_per_block - 1) // threads_per_block

    filter_positions_cuda[blocks_per_grid, threads_per_block](
        lons, lats, alts, polygon_x_device, polygon_y_device, filtered_lons, filtered_lats, filtered_alts, results_count
    )

    filtered_lons = filtered_lons.copy_to_host()[:results_count.copy_to_host()[0]]
    filtered_lats = filtered_lats.copy_to_host()[:results_count.copy_to_host()[0]]
    filtered_alts = filtered_alts.copy_to_host()[:results_count.copy_to_host()[0]]

    filtered_positions = [(filtered_lons[i], filtered_lats[i], filtered_alts[i]) for i in range(len(filtered_lons))]

    return filtered_positions


In [None]:
def plot_polygon_and_positions(polygon, positions):
    """Plot the polygon and satellite positions."""
    x, y = polygon.exterior.xy
    plt.plot(x, y, color='blue', label='Polygon')

    lons, lats = zip(*[(lon, lat) for lon, lat, _ in positions])
    plt.scatter(lons, lats, color='red', label='Satellite Positions')

    plt.xlabel('Longitude')
    plt.ylabel('Latitude')
    plt.legend()
    plt.show()

def write_output(name, file_path, times, positions, velocities):
    """Write satellite positions and velocities to the specified file."""
    with open(file_path, 'a') as file:
        for t, pos, vel in zip(times, positions, velocities):
            # Convert Julian Date to datetime and format it
            time_str = julian_date_to_datetime(t).strftime('%Y-%m-%d %H:%M:%S')
            pos_str = ' '.join(f'{x:.4f}' for x in pos)
            vel_str = ' '.join(f'{x:.4f}' for x in vel)
            file.write(f'{name},{time_str}, {pos_str}, {vel_str}\n')


ECEF TO LLA

In [None]:
import math

@cuda.jit(device=True)
def custom_sqrt(x):
    return math.sqrt(x)

@cuda.jit(device=True)
def custom_atan2(y, x):
    return math.atan2(y, x)

@cuda.jit(device=True)
def custom_cos(x):
    return math.cos(x)

@cuda.jit(device=True)
def custom_sin(x):
    return math.sin(x)

@cuda.jit
def ecef_to_lla_kernel(pos_x, pos_y, pos_z, lons, lats, alts):
    idx = cuda.grid(1)
    if idx < pos_x.size:
        # WGS84 ellipsoid parameters
        a = 6378137.0  # semi-major axis
        f = 1 / 298.257223563  # flattening
        e2 = 2 * f - f * f  # square of the first eccentricity

        x = pos_x[idx]
        y = pos_y[idx]
        z = pos_z[idx]
        p = custom_sqrt(x**2 + y**2)

        # Initial guess of latitude
        lat = custom_atan2(z, p * (1 - e2))
        N = a / custom_sqrt(1 - e2 * custom_sin(lat)**2)
        alt = p / custom_cos(lat) - N

        # Iteratively improve latitude estimate
        for _ in range(5):
            lat_prev = lat
            N = a / custom_sqrt(1 - e2 * custom_sin(lat)**2)
            alt = p / custom_cos(lat) - N
            lat = custom_atan2(z + e2 * N * custom_sin(lat), p)
            if abs(lat - lat_prev) < 1e-12:  # Convergence criterion
                break

        lon = custom_atan2(y, x)
        N = a / custom_sqrt(1 - e2 * custom_sin(lat)**2)
        alt = p / custom_cos(lat) - N

        # Convert radians to degrees
        lons[idx] = lon * 180.0 / math.pi
        lats[idx] = lat * 180.0 / math.pi
        alts[idx] = alt

In [None]:
def write_output1_cuda(name, file_path, times, positions):
    """Write satellite positions to the specified file in lat, long, alt format using CUDA."""

    # Extract position components
    pos_x = np.array([p[0] for p in positions], dtype=np.float32)
    pos_y = np.array([p[1] for p in positions], dtype=np.float32)
    pos_z = np.array([p[2] for p in positions], dtype=np.float32)

    # Allocate arrays for the results
    lons = np.zeros_like(pos_x, dtype=np.float32)
    lats = np.zeros_like(pos_x, dtype=np.float32)
    alts = np.zeros_like(pos_x, dtype=np.float32)

    # Transfer data to the GPU
    pos_x_device = cuda.to_device(pos_x)
    pos_y_device = cuda.to_device(pos_y)
    pos_z_device = cuda.to_device(pos_z)
    lons_device = cuda.to_device(lons)
    lats_device = cuda.to_device(lats)
    alts_device = cuda.to_device(alts)

    # Define CUDA grid and block size
    threads_per_block = 256
    blocks_per_grid = (pos_x.size + threads_per_block - 1) // threads_per_block

    # Launch the kernel
    ecef_to_lla_kernel[blocks_per_grid, threads_per_block](pos_x_device, pos_y_device, pos_z_device, lons_device, lats_device, alts_device)

    # Copy the results back to the CPU
    lons = lons_device.copy_to_host()
    lats = lats_device.copy_to_host()
    alts = alts_device.copy_to_host()

    # Write the results to the file
    with open(file_path, 'a') as file:
        for i, t in zip(range(len(positions)), times):
            time_str = julian_date_to_datetime(t).strftime('%Y-%m-%d %H:%M:%S')
            file.write(f'{name},{time_str}, {lons[i]:.4f}, {lats[i]:.4f}, {alts[i]:.4f}\n')
    return lons, lats, alts


Main Function

In [None]:
def main():
    tle_file = '/content/30sats.txt'  # Input TLE file
    output_file = 'satellite_positions.txt'
    output_file1 = 'satellite_positions_next.txt'  # Output file
    interval_minutes = 1  # Time interval in minutes

    filtered_output_file = 'filtered_satellite_positions.txt'

    # Read TLE data
    satellites = read_tle_file(tle_file)

    # Process each satellite
    for name, line1, line2 in satellites:
        try:
            # Create satellite record
            satrec = Satrec.twoline2rv(line1, line2, WGS72)

            # Extract epoch from line1 and convert to Julian Date
            tle_epoch = line1[18:32]  # Epoch is found in columns 19-32 of line1
            jd = tle_epoch_to_julian_date(tle_epoch)
            fr = jd % 1  # Fractional day

            times, positions, velocities = get_satellite_positions(name, satrec, jd, fr, interval_minutes, 0)

            lons, lats, alts = write_output1_cuda(name, output_file1, times, positions)
            print(f'Processed {name}')
        except Exception as e:
            print(f"Failed to process satellite {name}: {e}")
    filtered_positions = read_filtered_positions(output_file1)

    lat1, lon1 = 90.74973, 100.64459 # Top-left corner (latitude, longitude)
    lat2, lon2 = -87.32309, -147.79778  # Bottom-right corner (latitude, longitude)
    # Latitude: 16.66673, Longitude: 103.58196
    # Latitude: 69.74973, Longitude: -120.64459
    # Latitude: -21.09096, Longitude: -119.71009
    # Latitude: -31.32309, Longitude: -147.79778
    # Calculate the other two corners
    coords = [
        (lon1, lat1),  # Top-left
        (lon2, lat1),  # Top-right
        (lon2, lat2),  # Bottom-right
        (lon1, lat2)   # Bottom-left
    ]

    # Create a polygon from the four corners
    polygon = Polygon(coords)

    print(f"Polygon defined: {polygon}")
    plot_polygon_and_positions(polygon, filtered_positions)
    # Filter positions based on whether they fall within the polygon
    filtered_results = filter_satellite_positions_cuda(filtered_positions, polygon)

    print(f"Total positions within polygon: {len(filtered_results)}")

    # Write the filtered results to the output file
    with open(filtered_output_file, 'w') as file:
        for lon, lat, alt in filtered_results:
            file.write(f'{lon:.4f}, {lat:.4f}, {alt:.4f}\n')

    print(f'Filtered results saved to {filtered_output_file}')

if __name__ == '__main__':
    main()
