# Import libraries

In [None]:
import numpy as np
from sgp4.api import Satrec, jday
import pyproj
from timeit import default_timer as timer
import dask.delayed
import dask.array as da
from dask.diagnostics import ProgressBar, Profiler, ResourceProfiler, visualize
#dask.config.set(scheduler='threads')
"""
# One can create a particular Client depending on machine capability or multiple machines. Then, use the client to compute dask.delayed objects

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers = 4, threads_per_worker = 1)
c = Client(cluster)
c.cluster
"""

# Satellite count

In [None]:
COUNT = 30

# Functions to create Time Series (JD, FR) from datetime

In [None]:
def create_time_series(start_date, end_date):
    start_date = np.datetime64(start_date)
    end_date = np.datetime64(end_date)

    # for 1 sec intervals
    #time_series = np.arange(start_date, end_date, np.timedelta64(1, 's'))
    # for 0.1 sec intervals
    time_series = np.arange(start_date, end_date, np.timedelta64(100, 'ms'))
    
    return time_series

def JD_FR(time_series):
    # creating a wrapper function for map
    def jday_wrap(instance):
        dt = instance.astype(object)
        return jday(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
    # return JD, FR
    return map(np.array, zip(*map(jday_wrap, time_series)))

# Delayed functions (parallelized) for coordinate transformation using Pyproj

In [None]:
# function for x, y, z coordinates from satellite object
@dask.delayed(nout = 1, pure = True)
def dist(satellite, JD, FR):
    # satellite.sgp4_array returns e, r, v. We are interested in r.
    return satellite.sgp4_array(JD, FR)[1]

# function to calculate Longitude, Latitude, Altitude
# describing the transformer outside the function avoids creating the same object (encapsulation of transformer parameters) repeatedly
transformer = pyproj.Transformer.from_crs({"proj":'geocent', "ellps":'WGS84', "datum":'WGS84'}, {"proj":'latlong', "ellps":'WGS84', "datum":'WGS84'})
# use delayed decorator for parallization
@dask.delayed(nout = 3, pure = True)
def ecef2lla(pos_x, pos_y, pos_z):
    # return long, lati, alti 
    return [transformer.transform(pos_x, pos_y, pos_z, radians = False)]

# Function to read target file and create satellite objects using Pyproj

In [None]:
def create_satellite_array(file):
    count = COUNT
    satellite_array = []
    # use context manager to optimally open/close files
    with open(file, 'r') as sat:
        while count > 0:
            s = sat.readline()
            t = sat.readline()
        
            satellite_array.append(Satrec.twoline2rv(s, t))
            count -= 1
    return satellite_array

# Setup for user-defined region to filter LLA

In [None]:
#region latitude and longitude coordinates
#the implemented search is naive and equivalent to a rectangular area of search, i.e, larger than or equal to exact
#the area chosen is small to receive fewer final results
la_1, lo_1 = 16.66673, 18.58196
la_2, lo_2 = 19.74973, 17.64459
la_3, lo_3 = 17.09096, 19.71009
la_4, lo_4 = 18.32309, 16.79778

la_min = min(la_1, la_2, la_3, la_4)
la_max = max(la_1, la_2, la_3, la_4)
lo_min = min(lo_1, lo_2, lo_3, lo_4)
lo_max = max(lo_1, lo_2, lo_3, lo_4)

#function to filter long and lat that fall in the region
def filter_lon_lat(lo, la):
    return la < la_max and la > la_min and lo < lo_max and lo > lo_min

# Optional function to write filtered results to file

In [None]:
def write_to_file(satellite_num, L, Lo, La, time_series):
    with open("Final_result", 'a') as f:
        for j in range(L):
            if filter_lon_lat(Lo[j], La[j]):
                f.write("{} | {} | {} | {}\n".format(satellite_num+1, time_series[j], Lo[j], La[j]))

# Main

In [None]:
if __name__ == "__main__":
    
    start_date = '2023-01-01T00:00:00'
    end_date = '2023-01-06T00:00:00'
    # file in same directory as jupyter notebook
    #file = '30sats.txt'
    file = '27000sats.txt'
    
    start = timer()
    copy = start
    time_series = create_time_series(start_date, end_date)
    print("Creating time_series:", timer()-start)

    start = timer()
    JD, FR = JD_FR(time_series)
    #print(L)
    print("Creating JD, FR:", timer()-start)
    L = len(time_series)
    
    start = timer()
    satellite_array = create_satellite_array(file)
    print("Creating satellite objects:", timer()-start)

    results = []
    for i in range(COUNT):
        r = 1000*dist(satellite_array[i], JD, FR).ravel() 
        result = ecef2lla(r[0::3], r[1::3], r[2::3])
        results.append(result)
    #print(results)
    
    # calculate delayed objects in parallel
    start = timer()

    # compute inside the context of dask workers, CPU, and RAM profilers
    with ProgressBar(), Profiler() as prof, ResourceProfiler(dt = 0.25) as rprof:
        results = dask.compute(*results)
    print("Creating LLA:", timer()-start)
    #print(list(results)[0])

    #"""
    # filtering and printing results to file
    with open("Final_result", 'w') as f:
        f.write("Satellite | Time | Longitude | Latitude\n")
    for i in range(COUNT):
        (Lo, La, Al) = list(results)[i][0]
        #print(Lo, La)
        write_to_file(i, L, Lo, La, time_series)
    #"""
    print("Total:", timer() - copy)


# Graph of each parallel task

In [None]:
result.dask

# Resource Breakdown

In [None]:
from bokeh.plotting import figure, show
from bokeh.io import output_notebook
output_notebook()
prof.visualize()
rprof.visualize()