In [1]:
import warnings
from datetime import datetime, timedelta, timezone
from time import perf_counter
from typing import Optional

import numpy as np
import pandas as pd
from geoalchemy2.shape import to_shape
from geopy import distance
from shapely.geometry import Point
from sqlalchemy.orm import Session

from bloom.container import UseCases
from bloom.domain.excursion import Excursion
from bloom.domain.segment import Segment
from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository
from bloom.logger import logger
from bloom.domain.rel_segment_zone import RelSegmentZone
from bloom.infra.repositories.repository_rel_segment_zone import (
    RelSegmentZoneRepository,
)
from bloom.infra.repositories.repository_port import PortRepository

warnings.filterwarnings("ignore")

# minimal distance to consider a vessel being in a port (in meters)
threshold_distance_to_port = 5000
# maximal average speed of a vessel to check if it's in a port (in knot)
maximal_speed_to_check_if_in_port = 0.1
# average speed to take when vessel exit a port (in knot/h)
average_exit_speed = 7



In [13]:
def to_coords(row: pd.Series) -> pd.Series:
    if pd.isna(row["end_position"]) is False:
        row["longitude"] = row["end_position"].x
        row["latitude"] = row["end_position"].y

    return row


def add_excursion(
    session: Session,
    vessel_id: int,
    departure_at: datetime,
    departure_position: Optional[Point] = None,
) -> int:
    use_cases = UseCases()
    excursion_repository = use_cases.excursion_repository()
    port_repository = use_cases.port_repository()

    result = excursion_repository.get_param_from_last_excursion(session, vessel_id)
    print(result)

    if result:
        arrival_port_id = result["arrival_port_id"]
        arrival_position = (
            to_shape(result["arrival_position"]) if result["arrival_position"] else None
        )
    else:
        arrival_port_id = None
        arrival_position = None

    new_excursion = Excursion(
        vessel_id=vessel_id,
        departure_port_id=arrival_port_id if departure_position is None else None,
        departure_at=departure_at,
        departure_position=(
            arrival_position if departure_position is None else departure_position
        ),
        arrival_port_id=None,
        arrival_at=None,
        arrival_position=None,
        excursion_duration=timedelta(0),
        total_time_at_sea=timedelta(0),
        total_time_in_amp=timedelta(0),
        total_time_in_territorial_waters=timedelta(0),
        total_time_in_zones_with_no_fishing_rights=timedelta(0),
        total_time_fishing=timedelta(0),
        total_time_fishing_in_amp=timedelta(0),
        total_time_fishing_in_territorial_waters=timedelta(0),
        total_time_fishing_in_zones_with_no_fishing_rights=timedelta(0),
        total_time_default_ais=timedelta(0),
    )
    new_excursion = excursion_repository.create_excursion(session, new_excursion)

    return new_excursion.id


def close_excursion(
    session: Session,
    excursion_id: int,
    port_id: int,
    latitude: float,
    longitude: float,
    arrived_at: datetime,
) -> None:
    use_cases = UseCases()
    excursion_repository = use_cases.excursion_repository()
    port_repository = use_cases.port_repository()

    excursion = excursion_repository.get_excursion_by_id(session, excursion_id)

    if excursion:
        excursion.arrival_port_id = port_id
        excursion.arrival_at = arrived_at
        excursion.arrival_position = Point(longitude, latitude)


def get_distance_in_miles(x) -> float:
    p1 = (x.start_latitude, x.start_longitude)
    p2 = (x.end_latitude, x.end_longitude)
    return distance.distance(p1, p2).miles

In [14]:
use_cases = UseCases()
db = use_cases.db()
segment_repository = use_cases.segment_repository()
vessel_position_repository = use_cases.vessel_position_repository()
excursion_repository = use_cases.excursion_repository()
port_repository = use_cases.port_repository()
nb_created_excursion = 0
nb_closed_excursion = 0


In [15]:
with db.session() as session:
    logger.info(f"Lecture des nouvelles positions depuis le")
    batch = vessel_position_repository.get_positions_with_vessel_created_updated_after(
        session, '2024-12-19 15:53:42.403542+00:00'
    )
    logger.info(f"{len(batch)} nouvelles positions")
    last_segment = segment_repository.get_last_vessel_id_segments(session)
    last_segment["longitude"] = None
    last_segment["latitude"] = None
    last_segment = last_segment.apply(to_coords, axis=1)

[bloom INFO @ 17:55:15] Lecture des nouvelles positions depuis le
[bloom INFO @ 17:55:16] 895 nouvelles positions


In [39]:
result = pd.DataFrame()
for vessel_id in batch["vessel_id"].unique():
    batch_segments = pd.DataFrame()
    vessel_last_position = pd.DataFrame()
    df_positions = batch.loc[batch["vessel_id"] == vessel_id].copy()

    vessel_last_position = last_segment.loc[
        last_segment["vessel_id"] == vessel_id,
        [
            "mmsi",
            "timestamp_end",
            "heading_at_end",
            "speed_at_end",
            "end_position",
            "excursion_id",
            "arrival_port_id",
        ],
    ]

    vessel_last_position["end_latitude"] = vessel_last_position["end_position"].apply(
        lambda x: x.y
    )
    vessel_last_position["end_longitude"] = vessel_last_position["end_position"].apply(
        lambda x: x.x
    )
    vessel_last_position.drop("end_position", inplace=True, axis=1)

    vessel_last_position.reset_index(drop=True, inplace=True)

    for index, position in df_positions.iterrows():
        # position = df_positions.iloc[1]
        df = pd.DataFrame()

        if vessel_last_position.shape[0] > 0:
            # if there's a last segment for this vessel, then it's not the first time a position for this vessel is received
            is_new_vessel = False
            if "arrival_port_id" in vessel_last_position:
                if vessel_last_position["arrival_port_id"].iloc[0] >= 0:
                    open_ongoing_excursion = False
                else:
                    open_ongoing_excursion = True
                    ongoing_excursion_id = int(
                        vessel_last_position["excursion_id"].iloc[0]
                    )
            vessel_last_position.rename(
                columns={
                    "end_latitude": "start_latitude",
                    "end_longitude": "start_longitude",
                    "timestamp_end": "timestamp_start",
                    "heading_at_end": "heading_at_start",
                    "speed_at_end": "speed_at_start",
                },
                inplace=True,
            )

            if vessel_last_position.timestamp_start.iloc[0] != position.timestamp:
                new_position = (
                    position[
                        [
                            "timestamp",
                            "heading",
                            "longitude",
                            "latitude",
                            "speed",
                        ]
                    ]
                    .to_frame()
                    .T
                )

                new_position.rename(
                    columns={
                        "timestamp": "timestamp_end",
                        "heading": "heading_at_end",
                        "longitude": "end_longitude",
                        "latitude": "end_latitude",
                        "speed": "speed_at_end",
                    },
                    inplace=True,
                )
                new_position.reset_index(drop=True, inplace=True)
                if "arrival_port_id" in vessel_last_position:
                    vessel_last_position.drop("arrival_port_id", inplace=True, axis=1)
                if "excursion_id" in vessel_last_position:
                    vessel_last_position.drop("excursion_id", inplace=True, axis=1)
                df = pd.concat([vessel_last_position, new_position], axis=1)

                new_position["mmsi"] = position["mmsi"]
                vessel_last_position = new_position

        else:
            # if there's no last segment for this vessel, then it's the first time a position for this vessel is received
            is_new_vessel = True
            open_ongoing_excursion = False

            new_first_position = (
                position[["timestamp", "heading", "longitude", "latitude", "speed"]]
                .to_frame()
                .T
            )

            new_first_position.rename(
                columns={
                    "timestamp": "timestamp_start",
                    "heading": "heading_at_start",
                    "longitude": "start_longitude",
                    "latitude": "start_latitude",
                    "speed": "speed_at_start",
                },
                inplace=True,
            )

            new_last_position = new_first_position.copy()
            for col in new_last_position.columns:
                new_last_position.rename(
                    columns={col: col.replace("start", "end")}, inplace=True
                )
            new_first_position["timestamp_start"] += timedelta(0, -1)

            new_first_position.reset_index(drop=True, inplace=True)
            new_last_position.reset_index(drop=True, inplace=True)
            new_last_position["mmsi"] = position["mmsi"]

            df = pd.concat([new_first_position, new_last_position], axis=1)
            vessel_last_position = new_last_position

        batch_segments = pd.concat([batch_segments, df])

    df = batch_segments

    # reseting index
    df.reset_index(inplace=True, drop=True)
    if df.shape[0] > 0:
        df["distance"] = df.apply(get_distance_in_miles, axis=1)

        # calculate duration in seconds
        def get_duration(x) -> float:
            return (x.timestamp_end - x.timestamp_start).total_seconds()

        df["segment_duration"] = df.apply(get_duration, axis=1)

        # set default type as AT_SEA
        df["type"] = "AT_SEA"

        # set type as default_ais for segment with duration > 35 min
        df.loc[df["segment_duration"] >= 2100, "type"] = "DEFAULT_AIS"

        # calculate average speed in knot
        df["average_speed"] = df["distance"] / (df["segment_duration"] / 3600)

        # set last_vessel_segment
        df["last_vessel_segment"] = 0
        df["last_vessel_segment"].iloc[-1] = 1

        # check if segment ends in a port (only for segment with average_speed < maximal_speed_to_check_if_in_port or with type 'DEFAULT_AIS')

        def get_port(x, session):
            if x.type == "DEFAULT_AIS" or x.average_speed < maximal_speed_to_check_if_in_port:
                res = port_repository.get_closest_port_in_range(
                    session,
                    x.end_longitude,
                    x.end_latitude,
                    threshold_distance_to_port,
                )
                if res:
                    (port_id, distance) = res
                    return port_id
                else:
                    return None
            else:
                return None

        df["port"] = df.apply(get_port, axis=1, args=(session,))

        # get or create new excursion
        # logic :
        # if segment ends in a port while ongoing excursion is open, then we close the excursion
        # else, if the ongoing excursion is open, then we use the ongoing excursion_id for the segment
        # else, we create a new excursion whose id will become the ongoing excursion_id for this segment and the future ones
        # additionnaly, when we create a new excursion, if the vessel is 'new' then we create an 'empty' excursion
        # else, if the first segment of this new excursion is of type 'DEFAULT_AIS', we estimate the time of departure based
        # on its ending position, distance traveled and a given average exit speed
        df["excursion_id"] = np.NaN
        for a in df.index:
            if df["port"].iloc[a] is not None and df["port"].iloc[a] >= 0:
                if open_ongoing_excursion:
                    close_excursion(
                        session,
                        ongoing_excursion_id,
                        int(df["port"].iloc[a]),
                        df["end_latitude"].iloc[a],
                        df["end_longitude"].iloc[a],
                        df["timestamp_end"].iloc[a],
                    )
                    df["excursion_id"].iloc[a] = ongoing_excursion_id
                    open_ongoing_excursion = False
                    nb_closed_excursion += 1
            elif open_ongoing_excursion:
                df["excursion_id"].iloc[a] = ongoing_excursion_id
            else:
                if is_new_vessel:

                    ongoing_excursion_id = add_excursion(
                        session,
                        int(vessel_id),
                        df["timestamp_end"].iloc[a],
                        Point(
                            df["end_longitude"].iloc[a],
                            df["end_latitude"].iloc[a],
                        ),
                    )
                    is_new_vessel = False
                    nb_created_excursion += 1
                else:
                    def get_time_of_departure():
                        if df["type"].iloc[a] == "DEFAULT_AIS":
                            return df["timestamp_end"].iloc[a] - timedelta(
                                0,
                                3600 * df["distance"].iloc[a] / average_exit_speed,
                            )
                        else:
                            return df["timestamp_start"].iloc[a]

                    ongoing_excursion_id = add_excursion(
                        session, int(vessel_id), get_time_of_departure()
                    )  # put the create new excursion function here
                    nb_created_excursion += 1
                open_ongoing_excursion = True
                df["excursion_id"].iloc[a] = ongoing_excursion_id
        # concat the result for current vessel in the result dataframe
        if df.shape[0] > 0:
            result = pd.concat([result, df[df["excursion_id"] >= 0]], axis=0)
logger.info(f"{nb_created_excursion} excursion(s) créées")
logger.info(f"{nb_closed_excursion} excursion(s) cloturés")
logger.info("Création des segments")
result.reset_index(drop=True, inplace=True)
new_segments = []
for i in result.index:
    new_segment = Segment(
        excursion_id=result["excursion_id"].iloc[i],
        timestamp_start=result["timestamp_start"].iloc[i],
        timestamp_end=result["timestamp_end"].iloc[i],
        segment_duration=result["segment_duration"].iloc[i],
        start_position=Point(
            result["start_longitude"].iloc[i], result["start_latitude"].iloc[i]
        ),
        end_position=Point(
            result["end_longitude"].iloc[i], result["end_latitude"].iloc[i]
        ),
        distance=result["distance"].iloc[i],
        average_speed=result["average_speed"].iloc[i],
        speed_at_start=result["speed_at_start"].iloc[i],
        speed_at_end=result["speed_at_end"].iloc[i],
        heading_at_start=result["heading_at_start"].iloc[i],
        heading_at_end=result["heading_at_end"].iloc[i],
        type=result["type"].iloc[i],
        last_vessel_segment=result["last_vessel_segment"].iloc[i],
        in_zone_with_no_fishing_rights=False,
        in_amp_zone=False,
        in_territorial_waters=False,
    )
    new_segments.append(new_segment)
segment_repository.batch_create_segment(session, new_segments)
logger.info(f"{len(new_segments)} segment(s) créés")    

{'arrival_port_id': None, 'arrival_position': None}
{'arrival_port_id': None, 'arrival_position': None}


[bloom INFO @ 18:19:41] 4 excursion(s) créées
[bloom INFO @ 18:19:41] 12 excursion(s) cloturés
[bloom INFO @ 18:19:41] Création des segments
[bloom INFO @ 18:19:42] 316 segment(s) créés


In [44]:
batch[batch["mmsi"] == 238602140]

Unnamed: 0,id,timestamp,accuracy,collection_type,course,heading,position,longitude,latitude,rot,speed,created_at,vessel_id,mmsi
54,27591255,2024-12-19 15:28:31+00:00,LOW,DYNAMIC,0.0,,POINT (16.46560167 43.30532167),16.465602,43.305322,,0.009955,2024-12-19 15:53:54.019238+00:00,1516,238602140


In [41]:
result[result["port"] > 0]

Unnamed: 0,mmsi,timestamp_start,heading_at_start,speed_at_start,start_latitude,start_longitude,timestamp_end,heading_at_end,end_longitude,end_latitude,speed_at_end,distance,segment_duration,type,average_speed,last_vessel_segment,port,excursion_id
21,238602140,2024-12-19 15:13:21+00:00,,0.706194,43.30536,16.465625,2024-12-19 15:28:31+00:00,,16.465602,43.305322,0.009955,0.002896,910.0,AT_SEA,0.011455,1,949,136961.0
80,224014890,2024-12-19 15:30:01+00:00,254.0,4.002347,42.255448,3.179082,2024-12-19 15:45:00+00:00,0.0,3.179075,42.25546,0.003045,0.000875,899.0,AT_SEA,0.003504,1,4267,137123.0
128,224185250,2024-12-19 15:12:42+00:00,,4.670447,40.4622,0.475673,2024-12-19 15:28:02+00:00,,0.475643,40.462185,0.006426,0.00189,920.0,AT_SEA,0.007394,1,4211,137342.0
171,224138770,2024-12-19 14:46:08+00:00,,2.638201,39.772113,4.495998,2024-12-19 15:40:26+00:00,,4.37153,39.856252,8.455773,8.80631,3258.0,DEFAULT_AIS,9.73073,1,4325,137473.0
173,224073560,2024-12-19 15:30:00+00:00,275.0,4.442795,41.21581,1.735357,2024-12-19 15:45:01+00:00,275.0,1.735333,41.2158,0.004855,0.001398,901.0,AT_SEA,0.005587,1,4259,137484.0
211,228071900,2024-12-19 14:42:30+00:00,229.0,0.046345,-4.62,55.46,2024-12-19 15:00:27+00:00,229.0,55.46,-4.62,0.0,0.0,1077.0,AT_SEA,0.0,1,4173,138208.0
225,228043800,2024-12-19 15:30:01+00:00,24.0,3.979279,42.234097,-8.7354,2024-12-19 15:45:02+00:00,24.0,-8.735413,42.23409,0.002862,0.000824,901.0,AT_SEA,0.003293,1,4230,129391.0
248,204814000,2024-12-19 14:45:07+00:00,293.0,0.04105,37.737465,-25.655653,2024-12-19 15:45:00+00:00,50.0,-25.670325,37.736397,0.702588,0.806952,3593.0,DEFAULT_AIS,0.808524,1,3992,138210.0


In [17]:
session.commit()


In [77]:
# check if segment ends in a port (only for segment with average_speed < maximal_speed_to_check_if_in_port or with type 'DEFAULT_AIS')
def get_port(x, session):
    if (
        x.type == "DEFAULT_AIS"
        or x.average_speed < maximal_speed_to_check_if_in_port
    ):
        res = port_repository.get_closest_port_in_range(
            session,
            x.end_longitude,
            x.end_latitude,
            threshold_distance_to_port,
        )
        if res:
            (port_id, distance) = res
            return port_id
        else:
            return None
    else:
        return None

df["port"] = df.apply(get_port, axis=1, args=(session,))

# get or create new excursion
# logic :
# if segment ends in a port while ongoing excursion is open, then we close the excursion
# else, if the ongoing excursion is open, then we use the ongoing excursion_id for the segment
# else, we create a new excursion whose id will become the ongoing excursion_id for this segment and the future ones
# additionnaly, when we create a new excursion, if the vessel is 'new' then we create an 'empty' excursion
# else, if the first segment of this new excursion is of type 'DEFAULT_AIS', we estimate the time of departure based
# on its ending position, distance traveled and a given average exit speed
df["excursion_id"] = np.NaN
for a in df.index:
    if df["port"].iloc[a] is not None and df["port"].iloc[a] >= 0:
        if open_ongoing_excursion:
            close_excursion(
                session,
                ongoing_excursion_id,
                int(df["port"].iloc[a]),
                df["end_latitude"].iloc[a],
                df["end_longitude"].iloc[a],
                df["timestamp_end"].iloc[a],
            )
            df["excursion_id"].iloc[a] = ongoing_excursion_id
            open_ongoing_excursion = False
            nb_closed_excursion += 1
    elif open_ongoing_excursion:
        df["excursion_id"].iloc[a] = ongoing_excursion_id
    else:
        if is_new_vessel:
            ongoing_excursion_id = add_excursion(
                session,
                int(vessel_id),
                df["timestamp_end"].iloc[a],
                Point(
                    df["end_longitude"].iloc[a],
                    df["end_latitude"].iloc[a],
                ),
            )
            is_new_vessel = False
            nb_created_excursion += 1
        else:

            def get_time_of_departure():
                if df["type"].iloc[a] == "DEFAULT_AIS":
                    return df["timestamp_end"].iloc[a] - timedelta(
                        0,
                        3600 * df["distance"].iloc[a] / average_exit_speed,
                    )
                else:
                    return df["timestamp_start"].iloc[a]

            ongoing_excursion_id = add_excursion(
                session, int(vessel_id), get_time_of_departure()
            )  # put the create new excursion function here
            nb_created_excursion += 1
        open_ongoing_excursion = True
        df["excursion_id"].iloc[a] = ongoing_excursion_id
# concat the result for current vessel in the result dataframe
if df.shape[0] > 0:
    result = pd.concat([result, df[df["excursion_id"] >= 0]], axis=0)
    logger.info(f"{nb_created_excursion} excursion(s) créées")
    logger.info(f"{nb_closed_excursion} excursion(s) cloturés")
    logger.info("Création des segments")
    result.reset_index(drop=True, inplace=True)
    new_segments = []
    for i in result.index:
        new_segment = Segment(
        excursion_id=result["excursion_id"].iloc[i],
        timestamp_start=result["timestamp_start"].iloc[i],
        timestamp_end=result["timestamp_end"].iloc[i],
        segment_duration=result["segment_duration"].iloc[i],
        start_position=Point(
            result["start_longitude"].iloc[i], result["start_latitude"].iloc[i]
        ),
        end_position=Point(
            result["end_longitude"].iloc[i], result["end_latitude"].iloc[i]
        ),
        distance=result["distance"].iloc[i],
        average_speed=result["average_speed"].iloc[i],
        speed_at_start=result["speed_at_start"].iloc[i],
        speed_at_end=result["speed_at_end"].iloc[i],
        heading_at_start=result["heading_at_start"].iloc[i],
        heading_at_end=result["heading_at_end"].iloc[i],
        type=result["type"].iloc[i],
        last_vessel_segment=result["last_vessel_segment"].iloc[i],
        in_zone_with_no_fishing_rights=False,
        in_amp_zone=False,
        in_territorial_waters=False,
    )
    new_segments.append(new_segment)
    # segment_repository.batch_create_segment(session, new_segments)
logger.info(f"{len(new_segments)} segment(s) créés")


[bloom INFO @ 16:14:37] 76 excursion(s) créées
[bloom INFO @ 16:14:37] 76 excursion(s) cloturés
[bloom INFO @ 16:14:37] Création des segments
[bloom INFO @ 16:14:37] 1 segment(s) créés


In [78]:
new_segments

[Segment(id=None, excursion_id=138293, timestamp_start=Timestamp('2024-12-19 15:27:18+0000', tz='UTC'), timestamp_end=Timestamp('2024-12-19 15:44:00+0000', tz='UTC'), segment_duration=datetime.timedelta(seconds=1002), start_position=<POINT (-17.414 19.932)>, end_position=<POINT (-17.445 19.976)>, course=None, distance=3.667722095480924, average_speed=13.177444654422482, speed_at_start=11.32256602722433, speed_at_end=11.45088626036816, heading_at_start=314.0, heading_at_end=330.0, type='AT_SEA', in_amp_zone=False, in_territorial_waters=False, in_zone_with_no_fishing_rights=False, last_vessel_segment=True, created_at=None, updated_at=None)]

In [None]:
# On vide un peu la mémoire
session.flush()
new_segments = None
df = None

# Recherche des zones et calcul / mise à jour des stats
logger.info(
    "Mise en relation des segments avec les zones et calcul des statistiques d'excursion"
)
result = segment_repository.find_segments_in_zones_created_updated_after(
    session, "2024-12-19 15:53:42.403542+00:00"
)
new_rels = []
excursions = {}
segments = []
new_metricss = []
max_created_updated = "2024-12-19 15:53:42.403542+00:00"
i = 0
for segment, zones in result.items():
    segment_in_zone = False
    vessel_attributes = (
        segment_repository.get_vessel_attribute_by_segment_created_updated_after(
            session, segment.id, "2024-12-19 15:53:42.403542+00:00"
        )
    )  # metrics_repository.get_vessel_excursion_segment_by_id(session,segment.id) #1
    types = "AT_SEA"
    zones_names = []
    for zone in zones:
        if segment.type == "DEFAULT_AIS":
            # Issue 234: ne pas créer les relations pour les segments en default AIS
            types = "DEFAULT_AIS"
            continue
        segment_in_zone = True
        new_rels.append(RelSegmentZone(segment_id=segment.id, zone_id=zone.id))
        if zone.category == "amp":
            segment.in_amp_zone = True
            types = "in_amp"
        elif zone.category == "Fishing coastal waters (6-12 NM)":
            country_iso3 = vessel_attributes.country_iso3
            beneficiaries = zone.json_data.get("beneficiaries", [])
            if country_iso3 not in beneficiaries:
                segment.in_zone_with_no_fishing_rights = True
                types = "in_zone_with_no_fishing_rights"
        elif zone.category == "Clipped territorial seas":
            country_iso3 = vessel_attributes.country_iso3
            if country_iso3 != "FRA":
                segment.in_zone_with_no_fishing_rights = True
                types = "in_zone_with_no_fishing_rights"
        elif zone.category == "Territorial seas":
            segment.in_territorial_waters = True
            types = "in_territorial_water"  # 1
        # elif zone.category == "white zone":    #prospectif
        #    segment.in_white_zone = True
        #    types="white_zone"
        # duration_total_seconds = segment.segment_duration.total_seconds()