In [1]:
from ipyleaflet import Map, basemaps, Circle, Marker, Icon, MeasureControl, GeoData, AwesomeIcon, GeoJSON, AntPath, Polygon
from ipyleaflet.velocity import Velocity
import geopandas as gpd
from sidecar import Sidecar
import re
import math
import xarray as xr
import json
import threading
import queue
import time
import rtlsdr
import pyModeS
import random

from typing import List, Tuple, Dict, Optional


def calculate_destination_coordinates(lat, lon, cap, dist):
    # Earth radius in meters
    earth_radius = 6371000.0

    # Convert angles to radians
    lat_rad = math.radians(lat)
    lon_rad = math.radians(lon)
    cap_rad = math.radians(cap)

    # Calculate the destination latitude and longitude
    new_lat_rad = math.asin(math.sin(lat_rad) * math.cos(dist / earth_radius) +
                            math.cos(lat_rad) * math.sin(dist / earth_radius) * math.cos(cap_rad))

    new_lon_rad = lon_rad + math.atan2(math.sin(cap_rad) * math.sin(dist / earth_radius) * math.cos(lat_rad),
                                       math.cos(dist / earth_radius) - math.sin(lat_rad) * math.sin(new_lat_rad))

    # Convert radians back to degrees
    new_lat = math.degrees(new_lat_rad)
    new_lon = math.degrees(new_lon_rad)

    return new_lat, new_lon


def dms_to_decimal(coord_string):
    # Regular expression to extract values
    pattern = re.compile(r'(\d+)° (\d+)\' (\d+\.\d+)\" ([NSWE])')
    match = pattern.match(coord_string)

    if match:
        degrees, minutes, seconds, direction = match.groups()
        
        # Convert to decimal format
        decimal_coord = float(degrees) + float(minutes)/60 + float(seconds)/3600

        # Adjust for negative values (South or West)
        if direction in ['S', 'W']:
            decimal_coord = -decimal_coord

        return decimal_coord
    else:
        raise ValueError("Invalid coordinate string format")

def decimal_to_dms(latitude, longitude):
    # Function to convert decimal degrees to DMS format
    def dd_to_dms(degrees, is_latitude):
        direction = 'N' if is_latitude and degrees >= 0 else 'S'
        direction = 'E' if not is_latitude and degrees >= 0 else 'W'

        degrees = math.fabs(degrees)
        minutes, seconds = divmod(degrees * 3600, 60)
        degrees, minutes = divmod(minutes, 60)

        return f"{int(degrees)}° {int(minutes)}' {seconds:.2f}\" {direction}"

def compute_weight(direction: float):
    dir_rad = math.radians(direction)
    return max(0.4, (2 + math.fabs(math.cos(dir_rad) * math.exp(1 - math.cos(dir_rad - math.pi))) - math.fabs(math.sin(dir_rad))) * math.fabs(math.cos(2 * dir_rad) - 0.4))


In [2]:
from dataclasses import dataclass

# Class to store XWing Plane's current state
@dataclass
class XWingPlaneState:
    latitude: float
    longitude: float
    altitude: float
    roll: float
    pitch: float
    yaw: float
    route: float
    heading: float
    speed_north: float
    speed_east: float
    speed_vertical: float

# Class to store data for "danger zones"
class DangerZone:
    # Delay before activating termination system (in seconds)
    _delay_to_termination: float
    # Input state used to calculate the danger zone
    _xwing_plane_state: XWingPlaneState
    # Coordinates defining the contour of the zone (100 points)
    _contour_points: List[Tuple[float, float]]
    # G in m/s²
    _g: float
    
    def __init__(self,
             p_delay_to_termination: float,
             p_xwing_plane_state: XWingPlaneState):
        self._g = 9.81
        self._delay_to_termination = p_delay_to_termination
        self._xwing_plane_state = p_xwing_plane_state
        self._contour_points = self.compute_contour()
    
    def compute_contour(self):
        # Implement here the logic to compute the contour of the "danger zone"
        incertainty_factor = 1 + math.sqrt(self._delay_to_termination) / 10
        horizontal_speed = math.sqrt(self._xwing_plane_state.speed_north ** 2 + self._xwing_plane_state.speed_east ** 2)
        vertical_speed = self._xwing_plane_state.speed_vertical
        bank_angle = self._xwing_plane_state.roll
        turning_rate = math.degrees(self._g * math.tan(math.radians(bank_angle)) / horizontal_speed) if (bank_angle != 0 and horizontal_speed != 0) else 0
        delta_route = turning_rate * self._delay_to_termination
        projected_route = self._xwing_plane_state.route + delta_route
        distance = horizontal_speed * self._delay_to_termination if delta_route == 0 else 2 * (horizontal_speed ** 2) * math.sin(math.radians(delta_route/2))/ (self._g * math.tan(math.radians(bank_angle)))
        plane_estimated_position = calculate_destination_coordinates(self._xwing_plane_state.latitude, self._xwing_plane_state.longitude, projected_route, distance)
        plane_estimated_altitude = self._xwing_plane_state.altitude + self._delay_to_termination * vertical_speed
        maximal_free_fall_range = (horizontal_speed * math.sqrt(2) / (2 * self._g)) * (horizontal_speed * math.sqrt(2) / 2 + math.sqrt(horizontal_speed ** 2 / 2 + 2 * self._g * plane_estimated_altitude))
        return [calculate_destination_coordinates(plane_estimated_position[0], plane_estimated_position[1], projected_route + x, incertainty_factor * maximal_free_fall_range * compute_weight(x)) for x in range(0, 360, 5)]
    

In [3]:
# Class to simulate the processing and sending of data to update the Map and its elements
class SimulatedSendingThread(threading.Thread):
    # Queue
    _queue: queue.Queue
    # Count of many iterations have been done (used to simulatedifferent phases of a flight)
    _counter: int
    # Interval between sending 2 messages
    _interval: float
    # XWing Plane State - previous
    _xwing_previous_state: XWingPlaneState
    # XWing Plane State - updated
    _xwing_updated_state: XWingPlaneState
    # Danger Zones:
    _danger_zones: Dict[int, DangerZone]
    # G in m/s²
    _g: float

    def __init__(self,
                 p_queue: queue.Queue,
                 p_interval: float):
        threading.Thread.__init__(self)
        self._queue = p_queue
        self._interval = p_interval
        self._counter = 0
        self._g = 9.81
        dms_lat_cazaux = "44° 31' 41.33\" N"
        dms_lon_cazaux = "1° 08' 15.95\" W"
        self._danger_zones = {}
        self._xwing_previous_state = XWingPlaneState(latitude=dms_to_decimal(dms_lat_cazaux),
                                                     longitude=dms_to_decimal(dms_lon_cazaux),
                                                     altitude=0,
                                                     roll=0,
                                                     pitch=0,
                                                     yaw=0,
                                                     route=56.4,
                                                     heading=56.4,
                                                     speed_north=0,
                                                     speed_east=0,
                                                     speed_vertical=0)
        self._xwing_updated_state = self._xwing_previous_state
        self.daemon = True  # Set as daemon to exit with the main program

    def run(self):
        # Simulate some background work and update the queue
        while True:
            message = self.compute_message()
            self._queue.put(message)
            self._counter += 1
             # Sleep for a while (simulating a delay between updates)
            time.sleep(self._interval)

    def compute_message(self):
        self._xwing_updated_state = self.simulate_update_plane_state()
        self.compute_danger_zones()
        message = {'state': self._xwing_updated_state, 'danger_zones': self._danger_zones}
        self._xwing_previous_state = self._xwing_updated_state
        return message

    def simulate_update_plane_state(self):
        # Implement here the logic for simulating the update of the plane's state (position, speed, attitude...)
        previous_state = self._xwing_previous_state
        updated_state = self._xwing_previous_state
        alt_rate_climb = 20
        alt_rate_des = -5
        heading_rate = random.uniform(-2, 2)
        bank_angle_rate_right = 2
        bank_angle_rate_left = -2
        if 30 < self._counter < 230:
            # Don't do anything for 3 seconds, then take-off
            acceleration = 4
            speed = acceleration * self._counter * self._interval
            delta_distance = (acceleration * (2 * self._counter + 1) * self._interval ** 2) / 2
            new_latitude, new_longitude = calculate_destination_coordinates(previous_state.latitude,
                                                                            previous_state.longitude,
                                                                            previous_state.route,
                                                                            delta_distance)
            updated_state.latitude = new_latitude
            updated_state.longitude = new_longitude
            updated_state.speed_north = speed * math.cos(math.radians(updated_state.route))
            updated_state.speed_east = speed * math.sin(math.radians(updated_state.route))
        elif self._counter > 230:
            phase = (self._counter - 230) % 1000
            if phase < 400:
                # Turn clock-wise and climb
                speed = math.sqrt(previous_state.speed_north ** 2 + previous_state.speed_east ** 2)
                updated_state.speed_vertical = alt_rate_climb
                updated_state.altitude = previous_state.altitude + alt_rate_climb * self._interval
                updated_state.roll = min(60, bank_angle_rate_right * self._interval * (100 + (200/math.pi) * math.asin(math.sin(math.pi * (phase - 100)/200))))
                delta_route = self._interval * math.degrees(self._g * math.tan(math.radians(updated_state.roll)) / speed)
                updated_state.route = previous_state.route + delta_route
                updated_state.heading = previous_state.heading + delta_route
                new_latitude, new_longitude = calculate_destination_coordinates(previous_state.latitude,
                                                                                previous_state.longitude,
                                                                                updated_state.route,
                                                                                speed * self._interval)
                updated_state.latitude = new_latitude
                updated_state.longitude = new_longitude
                updated_state.speed_north = speed * math.cos(math.radians(updated_state.route))
                updated_state.speed_east = speed * math.sin(math.radians(updated_state.route))
            elif 400 <= phase < 700:
                # Just go straight
                speed = math.sqrt(previous_state.speed_north ** 2 + previous_state.speed_east ** 2)
                new_latitude, new_longitude = calculate_destination_coordinates(previous_state.latitude,
                                                                                previous_state.longitude,
                                                                                previous_state.route,
                                                                                speed * self._interval)
                updated_state.latitude = new_latitude
                updated_state.longitude = new_longitude 
            elif 700 <= phase < 850:
                # Turn counter clock-wise and drop
                speed = math.sqrt(previous_state.speed_north ** 2 + previous_state.speed_east ** 2)
                updated_state.speed_vertical = alt_rate_des
                updated_state.altitude = previous_state.altitude + alt_rate_des * self._interval
                updated_state.roll = max(-60, -1 * bank_angle_rate_right * self._interval * (100 + (200/math.pi) * math.asin(math.sin(math.pi * (phase - 887.5)/75))))
                delta_route = self._interval * math.degrees(self._g * math.tan(math.radians(updated_state.roll)) / speed)
                updated_state.route = previous_state.route + delta_route
                updated_state.heading = previous_state.heading + delta_route
                new_latitude, new_longitude = calculate_destination_coordinates(previous_state.latitude,
                                                                                previous_state.longitude,
                                                                                updated_state.route,
                                                                                speed * self._interval)
                updated_state.latitude = new_latitude
                updated_state.longitude = new_longitude
                updated_state.speed_north = speed * math.cos(math.radians(updated_state.route))
                updated_state.speed_east = speed * math.sin(math.radians(updated_state.route))
            else:
                # Go straight and heading oscillates 
                speed = math.sqrt(previous_state.speed_north ** 2 + previous_state.speed_east ** 2)
                new_latitude, new_longitude = calculate_destination_coordinates(previous_state.latitude,
                                                                                previous_state.longitude,
                                                                                previous_state.route,
                                                                                speed * self._interval)
                updated_state.latitude = new_latitude
                updated_state.longitude = new_longitude
                updated_state.heading = previous_state.heading + heading_rate * self._interval
        return updated_state

    def compute_danger_zones(self):
        for termination_delay in [0, 5, 15]:
            danger_zone = DangerZone(termination_delay, self._xwing_updated_state)
            self._danger_zones[termination_delay] = danger_zone
            
 

In [4]:
# Thread to display and update the Map
class MapThread(threading.Thread):
    # Queue
    _queue: queue.Queue
    # The Map
    _map: Map
    # XWing Plane Icon
    _xwing_plane_icon: Icon
    # XWing Plane Marker
    _xwing_plane_marker: Optional[Marker]
    # Danger Zones
    _danger_zone_0: Optional[Polygon]
    _danger_zone_5: Optional[Polygon]
    _danger_zone_15: Optional[Polygon]

    def __init__(self,
                 p_queue: queue.Queue):
        threading.Thread.__init__(self)
        self.daemon = True
        self._queue = p_queue
        dms_lat_cazaux = "44° 31' 41.33\" N"
        dms_lon_cazaux = "1° 08' 15.95\" W"
        center = [dms_to_decimal(dms_lat_cazaux), dms_to_decimal(dms_lon_cazaux)]

        self._map = Map(basemap=basemaps.OpenStreetMap.Mapnik,
                center=center,
                zoom=12,
                scroll_wheel_zoom=True,
                interpolation='nearest')
        # Icon for the XWING Plane
        self._xwing_plane_icon = Icon(
                                icon_url='http://localhost:8888/files/xwing_red_xs.svg?_xsrf=2%7Caf9410d2%7C287e68d8c54a4f39fe32864d93e70913%7C1706357870',
                                icon_size=[40, 40],
                                icon_anchor=[20, 20]
                                )
        
        self.inhabitated_zones()
        self.extra()

        self._xwing_plane_marker = None
        self._danger_zone_0 = None
        self._danger_zone_5 = None
        self._danger_zone_15 = None
        
        sc = Sidecar(title="XWING Map")
        with sc:
            display(self._map)

    def run(self):
        # Method to update the map
        while True:
            if not self._queue.empty():
                message = self._queue.get()
                # XWing Plane Marker Position
                state = message['state']
                xwing_location = [state.latitude, state.longitude]
                if self._xwing_plane_marker is None:
                    self._xwing_plane_marker = Marker(location=xwing_location,
                                                      draggable=False,
                                                      icon=self._xwing_plane_icon,
                                                      rotation_angle=state.heading,
                                                      rotation_origin='20px 20px')
                    self._map.add(self._xwing_plane_marker)
                else:
                    self._xwing_plane_marker.location = xwing_location
                    self._xwing_plane_marker.rotation_angle = state.heading
                # Danger Zones
                danger_zones = message['danger_zones']
                danger_zone_0 = danger_zones[0]
                danger_zone_5 = danger_zones[5]
                danger_zone_15 = danger_zones[15]
                if self._danger_zone_15 is None:
                    self._danger_zone_15 = Polygon(locations=danger_zone_15._contour_points,
                                                   color='red',
                                                   fill_color='red')
                    self._map.add(self._danger_zone_15)
                else:
                    self._danger_zone_15.locations = danger_zone_15._contour_points
                    
                if self._danger_zone_5 is None:
                    self._danger_zone_5 = Polygon(locations=danger_zone_5._contour_points,
                                                  color='purple',
                                                  fill_color='purple')
                    self._map.add(self._danger_zone_5)
                else:
                    self._danger_zone_5.locations = danger_zone_5._contour_points
                    
                if self._danger_zone_0 is None:
                    self._danger_zone_0 = Polygon(locations=danger_zone_0._contour_points,
                                                 color='green',
                                                 fill_color='green')
                    self._map.add(self._danger_zone_0)
                else:
                    self._danger_zone_0.locations = danger_zone_0._contour_points
                
            time.sleep(0.1)

    def inhabitated_zones(self):
        # Display habitated zones from a GeoJSON file 
        # This example GeoJSON file is the result of a conversion of a KML file obtained from drawing zones in Goggle My Maps/ Google Earth
        # It seems the raw KML format is not well supported (yet)
        with open('habitations.geojson','r') as f:
            data = json.load(f)
        geo_json = GeoJSON(data=data,
                           style = {'color': 'Red',
                                    'opacity':1,
                                    'weight':1.9,
                                    'dashArray':'9',
                                    'fillOpacity':0.3}
                          )
        self._map.add(geo_json)

    def extra(self):
        # Measure Control (measure distances/areas). Has issues though
        measure = MeasureControl(
            position='bottomleft',
            active_color = 'orange',
            primary_length_unit = 'kilometers'
        )
        measure.add_area_unit('sqkmeters', 1000000, 4)
        measure.primary_area_unit='sqkmeters'
        measure.completed_color = 'red'
        
        # Just to show we can add Layer for wind
        ds = xr.open_dataset("wind-global.nc")
        display_options = {
            "velocityType": "Global Wind",
            "displayPosition": "bottomright",
            "displayEmptyString": "No wind data",
        }
        wind = Velocity(
            data=ds,
            zonal_speed="u_wind",
            meridional_speed="v_wind",
            latitude_dimension="lat",
            longitude_dimension="lon",
            velocity_scale=0.01,
            max_velocity=20,
            display_options=display_options,
        )
        self._map.add(wind)
        self._map.add(measure)
        


In [None]:
my_queue = queue.Queue()
update_thread = SimulatedSendingThread(my_queue, 0.1)
map_thread = MapThread(my_queue)

update_thread.start()
map_thread.start()

update_thread.join()
map_thread.join()