In [1]:
from traffic.data.samples.calibration import ajaccio

In [4]:
from traffic.core import Traffic

In [10]:
from traffic.data import airports

In [12]:
from ipyleaflet import Map

In [21]:
import pandas as pd

In [35]:
from shapely.geometry import LineString, MultiLineString, Point, Polygon

In [39]:
from traffic.core.geodesy import destination, distance, mrr_diagonal

In [31]:

from traffic.core.structure import Airport
from traffic.data import airports


In [37]:
import numpy as np

In [6]:
traf = Traffic.from_file("../../data/intermediate/taxi_zurich_2019_takeoff.pkl")

In [25]:
f = traf['SWR5220_6069']
f.onground().map_leaflet(zoom=13)

Map(center=[47.46626184415817, 8.561359365703353], controls=(ZoomControl(options=['position', 'zoom_in_text', …

In [26]:
m = Map(center=airports['LSZH'].latlon, zoom=11)
for f_ in f.onground().split('5T'):
    print(f_.duration, f_.start, f_.stop)
    m.add_layer(f_.leaflet())
m

0 days 00:08:19 2019-11-05 12:57:02+00:00 2019-11-05 13:05:21+00:00
0 days 00:05:43 2019-11-05 16:36:52+00:00 2019-11-05 16:42:35+00:00


Map(center=[47.458056, 8.548056], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title', …

In [43]:
from typing import (
    TYPE_CHECKING,
    Iterable,
    Iterator,
    List,
    Optional,
    Sequence,
    Union,
    cast,
)


def ground_trajectory(
    self, airport: Union[str, "Airport"]
) -> Iterator["Flight"]:
    """Returns the ground part of the trajectory limited to the apron
    of the airport passed in parameter.

    The same trajectory could use the apron several times, hence the safest
    option to return a FlightIterator.
    """

    from traffic.data import airports

    self = cast("Flight", self)
    airport_ = airports[airport] if isinstance(airport, str) else airport
    for ground_segment in self.onground().split("5T"):
        clip_ = ground_segment.clip(airport_)
        if clip_ is not None:
            yield clip_
            

def on_taxiway(
    self,
    airport_or_taxiways: Union[str, pd.DataFrame, "Airport", "Overpass"],
    *,
    tolerance: float = 15,
    tolerance_dist: float = 85,
) -> Iterator["Flight"]:
    """
    Iterates on segments of trajectory matching a single runway label.
    """

    self = cast("Flight", self)
    if isinstance(airport_or_taxiways, str):
        airport_or_taxiways = airports[airport_or_taxiways]

    taxiways_ = (
        airport_or_taxiways.taxiway
        if isinstance(airport_or_taxiways, Airport)
        else airport_or_taxiways
    )

    taxiways = (  # one entry per runway label
        (
            taxiways_
            if isinstance(taxiways_, pd.DataFrame)
            else taxiways_.data
        )
        .groupby("ref")
        .agg({"geometry": list})["geometry"]
        .apply(MultiLineString)
        .to_frame()
    )

    simplified_df = cast(
        pd.DataFrame, self.simplify(tolerance=tolerance).data
    )
    if simplified_df.shape[0] < 2:
        return

    previous_candidate = None
    first = simplified_df.iloc[0]
    for _, second in simplified_df.iloc[1:].iterrows():

        p1 = Point(first.longitude, first.latitude)
        p2 = Point(second.longitude, second.latitude)

        # To have the sum of distances between each extremities 
        # of current simplified segment projected on the taxiway
        def extremities_dist(twy):
            p1_proj = twy.interpolate(twy.project(p1))
            p2_proj = twy.interpolate(twy.project(p2))
            d1 = distance(p1_proj.y, p1_proj.x, p1.y, p1.x)
            d2 = distance(p2_proj.y, p2_proj.x, p2.y, p2.x)
            return d1 + d2

        temp_ = taxiways.assign(dist=np.vectorize(extremities_dist))
        start, stop, ref, dist = (
            first.timestamp,
            second.timestamp,
            temp_.dist.idxmin(),
            temp_.dist.min(),
        )
        if dist < tolerance_dist:
            candidate = self.assign(taxiway=ref).between(start, stop)
            if previous_candidate is None:
                previous_candidate = candidate

            else:
                prev_ref = previous_candidate.taxiway_max
                delta = start - previous_candidate.stop
                if prev_ref == ref and delta < pd.Timedelta("1T"):
                    previous_candidate = self.assign(taxiway=ref).between(
                        previous_candidate.start, stop
                    )

                else:
                    yield previous_candidate
                    previous_candidate = candidate

        first = second

    if previous_candidate is not None:
        yield previous_candidate
            
import types 
f.ground_trajectory = types.MethodType(ground_trajectory, f)
f.on_taxiway = types.MethodType(on_taxiway, f)

In [44]:
m = Map(center=airports['LSZH'].latlon, zoom=11)
for f_ in f.ground_trajectory("LSZH"):
    m.add_layer(f_.leaflet())
m

Map(center=[47.458056, 8.548056], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title', …

In [46]:
m = Map(center=airports['LSZH'].latlon, zoom=11)
for f_ in f.on_taxiway("LSZH"):
    print(f_.taxiway_max, f_.start, f_.stop)
    m.add_layer(f_.leaflet())
m

E 2019-11-05 12:57:04+00:00 2019-11-05 12:59:10+00:00
LINK 3 2019-11-05 12:59:13+00:00 2019-11-05 12:59:32+00:00
INNER 2019-11-05 12:59:34+00:00 2019-11-05 13:00:52+00:00
A 2019-11-05 13:01:51+00:00 2019-11-05 13:04:01+00:00
H2 2019-11-05 16:37:46+00:00 2019-11-05 16:38:56+00:00
J 2019-11-05 16:39:00+00:00 2019-11-05 16:40:12+00:00
A 2019-11-05 16:40:15+00:00 2019-11-05 16:40:57+00:00
F 2019-11-05 16:41:00+00:00 2019-11-05 16:42:34+00:00


Map(center=[47.458056, 8.548056], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title', …

In [None]:
at gate 
aligned on runway / takeofffromrunway
aligned on runway / landing
at gate

use onground()