### Import packages and set paths

In [None]:
import os

import dask.dataframe as dd
import dask_gateway
import dask.distributed

import dotenv
import warnings
warnings.filterwarnings('ignore')

import matplotlib.pyplot as plt
import geopandas
from shapely.geometry import Polygon, LineString, Point, MultiPolygon
from shapely.ops import transform, cascaded_union
import numpy as np
import movingpandas as mpd
import datetime
import pandas as pd
import geopandas as gpd
import os
import pyproj
import scipy
import pyarrow as pa
import pickle

Set the path to the AIS data

In [None]:
#sets the path to load pre-processed ais data
folder_name = '2022_PoR'
path_name = 'abfs://ais/parquet/' + folder_name  

#sets the path to load other local data
current_directory = os.getcwd()
path = current_directory.split("\\01_Data_Analysis\\02_AIS_data")[0]

### Loads the access token (we use a SAS-token to protect the data)

In [None]:
# this is for environmental variables for secrets (needs python-dotenv)
# You can copy the  .env.example file and rename it to .env (one directory  up from the notebooks)
# 
%load_ext dotenv
# Load environment variables from the .env file 1 directory up
%dotenv -v

In [None]:
# read the environment variable from the  .env file
sas_token = dotenv.dotenv_values()['AZURE_BLOB_SAS_TOKEN']

### Creation of the cluster

In [None]:
#creates the calculation cluster
gateway = dask_gateway.Gateway()
cluster_options = gateway.cluster_options()
cluster = gateway.new_cluster(cluster_options)
cluster.adapt(minimum=1, maximum=100)
cluster

In [None]:
#provides access to the calculation cluster
client = dask.distributed.Client(cluster)
client

In [None]:
#Downloads packages to the workers of the calculation cluster
def worker_setup(dask_worker: dask.distributed.Worker):
    import os
    os.system("pip install -q movingpandas")  # or pip
    os.system("pip install -q more-itertools")
    os.system("pip install -q dask")

client.register_worker_callbacks(worker_setup)

### Import geospatial data and creation of areas of interests and vessel types

In [None]:
#Creates transformation functions between spatial references systems
utm = pyproj.CRS('EPSG:28992')
wgs84 = pyproj.CRS('EPSG:4326')
wgs_to_utm = pyproj.Transformer.from_crs(wgs84,utm,always_xy=True).transform
utm_to_wgs = pyproj.Transformer.from_crs(utm,wgs84,always_xy=True).transform

In [None]:
#Creates a dictionary with the geospatial areas of interest
anchorage_areas = gpd.read_file(path+"\\00_Input_data\\01_Geospatial_data\\anchorage_areas.geojson")

anchorage_areas['geometry'] = [Polygon(geom) for geom in anchorage_areas['geometry']] 
areas_of_interest = {}
areas_of_interest['port_entrance'] = gpd.read_file(path+"\\00_Input_data\\01_Geospatial_data\\Port_Entrance.geojson")['geometry'][0]
areas_of_interest['berths'] = transform(utm_to_wgs,MultiPolygon(pickle.load(open(path+"\\00_Input_data\\01_Geospatial_data\\berths_PoR.pickle",'rb'))['geometry'].to_list()))
areas_of_interest['anchorage_areas'] = cascaded_union(anchorage_areas['geometry'])

### Create and select trajectories of interest

In [None]:
#Functions
def create_sorting_key(df,columns):
    """ 
    Function that combines columns to create a sorting key 

    Parameters
    ----------
    df: pandas dataframe
    columns: list of the column names of the dataframe of which the sorting keys should be composed of

    :returns: pandas dataframe
    """
    
    for column_index,column_name in enumerate(columns):
        if column_index == 0:
            sorting_key = df[column_name].astype(str)
            continue
        sorting_key = sorting_key + '_' + df[column_name].astype(str)
    df['sorting_key'] = sorting_key
    return df

def add_columns(df,added_columns):
    """ 
    Function that adds new columns with NaN floats to the dataframe

    Parameters
    ----------
    df: pandas dataframe
    add_columns: list of names that should be new columns

    :returns: pandas dataframe
    """
    
    for added_column in added_columns:
        df[added_column] = np.NaN
    return df

def create_gdf(df):
    """ 
    Function that creates a geopandas dataframe from a pandas dataframe with ['longitude'] and ['latitude']
    columns with values in WGS4326

    Parameters
    ----------
    df: pandas dataframe

    :returns:  geopandas dataframe
    """
    gdf = geopandas.GeoDataFrame(df,columns=df.columns,crs="EPSG:4326",geometry=geopandas.points_from_xy(df.longitude, df.latitude))
    return gdf

def transform_projection(gdf,crs):
    """ 
    Function that transforms the geometries of a geopandas dataframe

    Parameters
    ----------
    gdf: geopandas dataframe
    crs: coordinate reference system as a string in a 'EPSG:#'-format

    :returns:  geopandas dataframe
    """
    
    transformed_gdf = gdf.to_crs(crs)
    return transformed_gdf

def trajectorize(gdf):
    """ 
    Function that trajectorizes the AIS data in a geopandas dataframe using the following columns: ['name',
    'timestamplast','latitude','longitude']

    Parameters
    ----------
    gdf: geopandas dataframe

    :returns:  movingpandas trajectory collection
    """
    import movingpandas as mpd
    traj_collection = mpd.TrajectoryCollection(gdf,traj_id_col='name',t='timestamplast',x='latitude',y='longitude')
    traj_collection.add_speed(overwrite=True)
    traj_collection.add_direction(overwrite=True)
    traj_collection.add_acceleration(overwrite=True)
    return traj_collection

def splitter(traj_collection,max_diameter,min_duration,min_length,gap):
    """ 
    Function that splits the movingpandas trajectories based on a time gap and a stop criterion

    Parameters
    ----------
    traj_collection: movingpandas trajectory collection
    max_diameter: diameter in meters that holds as a boundary box for the stop condition
    min_duration: minimum duration as pandas timedelta that holds as a timeframe for the stop condition
    min_length: length in meters which a splitted trajectory should have in order to be stored in the collection
    gap: time gap in as pandas timedelta

    :returns:  movingpandas trajectory collection
    """
    import movingpandas as mpd
    splitted_trajs = mpd.ObservationGapSplitter(traj_collection).split(gap=gap)
    splitted_trajs = mpd.StopSplitter(splitted_trajs).split(max_diameter=max_diameter,min_duration=min_duration,min_length=min_length)
    return splitted_trajs

def traj_to_df(trajs):
    """ 
    Function that transforms a movingpandas trajectory collection into a dataframe

    Parameters
    ----------
    trajs: movingpandas trajectory collection
    
    :returns:  pandas dataframe
    """
    
    traj_df = pd.DataFrame(trajs.to_line_gdf())
    return traj_df

def create_splitted_trajectories(df,crs,max_diameter,min_duration,min_length,gap):
    """ 
    Function that adds trajectory IDs to the pandas dataframe with AIS based on vessel names, time gap, and
    stop criteria conditions

    Parameters
    ----------
    df: pandas dataframe
    crs: coordinate reference system as a string in a 'EPSG:#'-format
    max_diameter: diameter in meters that holds as a boundary box for the stop condition
    min_duration: minimum duration as pandas timedelta that holds as a timeframe for the stop condition
    min_length: length in meters which a splitted trajectory should have in order to be stored in the collection
    gap: time gap in as pandas timedelta

    :returns: pandas dataframe
    """
    import movingpandas as mpd
    column_names = df.columns
    gdf = create_gdf(df)
    transformed_gdf = transform_projection(gdf,crs)
    traj_collection = trajectorize(transformed_gdf)
    splitted_trajs = splitter(traj_collection,max_diameter,min_duration,min_length,gap)
    splitted_trajs.add_traj_id(overwrite=True)
    splitted_trajs_df = traj_to_df(splitted_trajs)
    splitted_trajs_df = splitted_trajs_df.rename(columns={"t": "timestamplast"}) 
    splitted_trajs_df = splitted_trajs_df.reindex(columns=column_names)
    return splitted_trajs_df

def trajectories_in_areas_of_interest(df,areas_of_interest):
    """ 
    Function that determines whether a location of an AIS data message is within the areas of interest

    Parameters
    ----------
    df: pandas dataframe containing AIS data with longitudes and latitudes in WGS84 in ['longitude'] and 
        ['latitude'] columns, respectively, and trajectory IDs in a ['traj_id'] column
    areas_of_interest: dictionary with areas of interest names as names and shapely polygons as values

    :returns: pandas dataframe containing AIS data
    """
    
    selected_columns = ['name','traj_id','vesseltype','hazardouscargo','length','width','draughtMarine','timestamplast','longitude','latitude','sog','heading','speed','direction','acceleration']
    selected_columns.extend([area for area in areas_of_interest])
    df_selected = pd.DataFrame(columns = selected_columns)
    if len(df) > 2:
        gdf = create_gdf(df)
        for area in areas_of_interest:
            gdf[area] = gdf['geometry'].apply(areas_of_interest[area].intersects)
        df_selected = gdf[selected_columns]

    return df_selected

In [None]:
#Loads data
ddf = dd.read_parquet(path_name+'/selected_vessels_for_further_analysis', storage_options={"account_name": "rwsais", "sas_token": sas_token})

In [None]:
#Performs the functions on the AIS data partitions and saves the data as parquet files in the Azure storage
ddf_i = ddf
ddf_i = ddf_i.map_partitions(sort_values,['name','timestamplast'])
ddf_i = ddf_i.map_partitions(add_columns,added_columns=['direction', 'prev_t', 'speed', 'acceleration', 'traj_id','geometry'])
ddf_i = ddf_i.map_partitions(split_trajectories,crs='EPSG:32631',max_diameter=25,min_duration=datetime.timedelta(minutes=30),min_length=0,gap=datetime.timedelta(hours=6),meta=ddf_i)
ddf_i['traj_id']=ddf_i['traj_id'].astype(str)
ddf_i = ddf_i.map_partitions(trajectories_in_areas_of_interest,areas_of_interest)
ddf_i = ddf_i.repartition(partition_size="10MB")
scheme_information = {'name': pa.string(),
                      'traj_id': pa.string(),
                      'vesseltype': pa.int64(),
                      'hazardouscargo': pa.int64(),
                      'length': pa.float64(),
                      'width': pa.float64(),
                      'draughtMarine': pa.float64(),
                      'timestamplast': pa.timestamp('ns', tz='UTC'),
                      'longitude': pa.float64(),
                      'latitude': pa.float64(),
                      'sog': pa.float64(),
                      'heading': pa.float64(),
                      'speed': pa.float64(),
                      'direction': pa.float64(),
                      'acceleration': pa.float64(),
                      'anchorage_areas': pa.bool_(),
                      'port_entrance': pa.bool_(),
                      'berths': pa.bool_()}
ddf_i.to_parquet(path_name+'/trajectories', storage_options={"account_name": "rwsais", "sas_token": sas_token},schema=scheme_information,engine='pyarrow',write_index=False)

### Sort trajectories
The trajectories have to be sorted at name and timestamp in order to prepare the data for the next steps

In [None]:
#Functions
def reset_index(df):
    """ 
    Function that resets the index of a pandas dataframe 

    Parameters
    ----------
    df: pandas dataframe

    :returns: pandas dataframe
    """
    
    df = df.reset_index(drop=False)
    return df

def sort_values(df,columns):
    """ 
    Function that sorts a dataframe based on columns

    Parameters
    ----------
    df: pandas dataframe
    columns: list of column names

    :returns: pandas dataframe
    """
    
    df = df.sort_values(columns)
    return df

def remame_vessels(df):
    """ 
    Function that sorts a dataframe based on columns

    Parameters
    ----------
    df: pandas dataframe
    columns: list of column names

    :returns: pandas dataframe
    """
    names = df['name'].to_list()
    for index,name in enumerate(names):
        if isinstance(name,str):
            names[index] = name.split('_')[0]
    df['name'] = names
    return df

In [None]:
#Performs the functions on the AIS data partitions and saves the data as parquet files in the Azure storage
cluster.scale(n=5) #Shuffeling of data over partitions must be done using few workers to meet criteria
ddf_i = ddf.partitions[:]
ddf_i = ddf_i.map_partitions(remame_vessels)
ddf_i = ddf_i.set_index('name')
ddf_i = ddf_i.map_partitions(reset_index)
ddf_i = ddf_i.map_partitions(sort_values,['name','timestamplast'])
ddf_i.to_parquet(path_name+'/sorted_trajectories', storage_options={"account_name": "rwsais", "sas_token": sas_token})
cluster.adapt(minimum=1, maximum=100)

### Merging of trajectories that were split by the partitions

In [None]:
def merge_tracks(track1,track2):
    """ 
    Function that merges two dataframes containing the AIS messages of trajectories

    Parameters
    ----------
    track1: pandas dataframe with first trajectory 
    track2: pandas dataframe with second trajectory  

    :returns: movingpandas trajectory 
    """
    
    track = pd.Series(index=df.iloc[0].keys())
    for (index,info1),(index,info2) in zip(track1.items(),track2.items()):
        if index in ['name','departure','draught','origin']:
            track[index] = info1
        elif index in ['arrival','destination']:
            track[index] = info2
        elif index == 'times':
            raw_times = np.append(info1,info2)
            track[index] = sorted(raw_times)
        elif index in ['coordinates','sog','cog','speed','direction','acceleration']:
            info = np.append(info1,info2)
            track[index] = sort_together([raw_times,info])[1]
        elif index in ['anchorage_areas','port_entrance','berths']:
            if True in [info1,info2]:
                track[index] = True
            else:
                track[index] = False
    track['geometry'] = LineString(track['coordinates'])
    track['distance'] = transform(wgs84_to_utm,track['geometry']).length
    track['duration'] = track['arrival']-track['departure']
    return track

def merge_trajectories(df,gap,distance,utm):
    """ 
    Function that merges trajectories based on their gap in time and space.

    Parameters
    ----------
    df: pandas dataframe with AIS data
    gap: maximum time as a pandas timedelta over which the timestamps of two AIS message of the same vessel 
         can differ in order to be classified as the same trajectory
    distance: maximum distance in meters over which the distance between the locations of two AIS message 
              of the same vessel can differ in order to be classified as the same trajectory
    utm: Universal Transverse Mercator or other EPSG with units in meters as a string in 'EPSG:#'-format 

    :returns: pandas dataframe
    """
    
    if df['origin'].iloc[0] == 'foo':
        return df
    from more_itertools import sort_together
    wgs84 = pyproj.CRS('EPSG:4326')
    utm = pyproj.CRS(utm)
    utm_to_wgs84 = pyproj.Transformer.from_crs(utm, wgs84, always_xy=True).transform
    wgs84_to_utm = pyproj.Transformer.from_crs(wgs84, utm, always_xy=True).transform
    
    merged_trajectories_df = pd.DataFrame(columns=df.columns)
    for name in dict.fromkeys(df['name'].to_numpy()).keys():
        df_ship = df[df.name == name]
        traj_prev = df_ship.iloc[0]
        merged_trajectory_df = pd.DataFrame([traj_prev])  
        for index_next in df_ship.index[1:]:
            traj_next = df_ship.loc[index_next]
            end_point = transform(utm_to_wgs84,traj_prev['destination'])
            start_point = transform(utm_to_wgs84,traj_next['origin'])
            length_prev_traj = traj_prev['distance']
            average_speed_prev_traj = np.mean(traj_prev['sog'])
            length_next_traj = traj_next['distance']
            average_speed_next_traj = np.mean(traj_prev['sog'])
            deltatime = traj_next['departure']-traj_prev['arrival']
            deltadistance = end_point.distance(start_point)
            if deltatime <= pd.Timedelta(0):
                traj_prev = merged_trajectory_df.iloc[-1] = merge_tracks(traj_prev,traj_next)
            elif deltatime <= gap and deltadistance <= distance and traj_prev['draught'] == traj_next['draught']:
                traj_prev = merged_trajectory_df.iloc[-1] = merge_tracks(traj_prev,traj_next)
            elif deltatime <= 5*gap and deltadistance <= 5*distance and traj_prev['draught'] == traj_next['draught']:
                traj_prev = merged_trajectory_df.iloc[-1] = merge_tracks(traj_prev,traj_next)
            else:
                merged_trajectory_df = pd.concat([merged_trajectory_df,pd.DataFrame(df_ship.loc[[index_next]])])
                traj_prev = merged_trajectory_df.iloc[-1]
        merged_trajectories_df = pd.concat([merged_trajectories_df,merged_trajectory_df])
    return merged_trajectories_df

def create_trajectory_dataframe(df,trajectory_columns,areas_of_interest,utm):
    """ 
    Function that creates a trajectory dataframe with on each row a separate trajectory 

    Parameters
    ----------
    df: pandas dataframe with AIS data
    trajectory_columns: column names of trajectory dataframe
    areas_of_interest: dictionary with areas of interest names as names and shapely polygons as values
    utm: Universal Transverse Mercator or other EPSG with units in meters as a string in 'EPSG:#'-format 

    :returns: pandas dataframe with AIS data trajectories
    """
    
    import movingpandas as mpd
    if len(df) == 0:
        return trajectory_dataframe
    trajectory_dataframe = pd.DataFrame(columns=trajectory_columns)
    wgs84 = pyproj.CRS('EPSG:4326')
    utm = pyproj.CRS(utm)
    utm_to_wgs84 = pyproj.Transformer.from_crs(utm, wgs84, always_xy=True).transform
    
    for name in dict.fromkeys(df['name'].to_numpy()).keys():
        df_ship = df[df.name == name]
        for index,traj_id in enumerate(dict.fromkeys(df_ship['traj_id'].to_numpy()).keys()):
            df_traj = df_ship[df_ship.traj_id == traj_id]
            df_traj = geopandas.GeoDataFrame(df_traj,columns=df_traj.columns,crs="EPSG:4326",geometry=geopandas.points_from_xy(df_traj.longitude, df_traj.latitude))
            df_traj = df_traj.to_crs(utm)
            if len(df_traj) <= 1:
                continue
            trajectory = mpd.Trajectory(df_traj,traj_id='traj_id',t='timestamplast',x='latitude',y='longitude')
            if len(trajectory.df) <= 1:
                continue
            index = index
            name = name
            departure = trajectory.get_start_time()
            arrival = trajectory.get_end_time()
            origin = transform(utm_to_wgs84,trajectory.get_start_location())
            destination = transform(utm_to_wgs84,trajectory.get_end_location())
            distance = trajectory.get_length()
            duration = trajectory.get_duration()
            draught = trajectory.df['draughtMarine'].mode().to_numpy()
            if len(draught) > 0:
                draught = draught[0]
            else:
                draught = np.NaN
            geometry = transform(utm_to_wgs84,trajectory.to_linestring())
            times = [datetime.datetime.fromtimestamp(time) for time in (trajectory.df.index.to_numpy()-np.datetime64(0,'s'))/np.timedelta64(1, 's')]
            coordinates = [transform(utm_to_wgs84,point) for point in trajectory.df['geometry'].to_numpy()]
            sog = trajectory.df['sog'].to_numpy()
            cog = trajectory.df['heading'].to_numpy()
            speed = trajectory.df['speed'].to_numpy()
            direction = trajectory.df['direction'].to_numpy()
            acceleration = trajectory.df['acceleration'].to_numpy()
            anchorage_areas = areas_of_interest['anchorage_areas'].intersects(geometry)
            port_entrance = areas_of_interest['port_entrance'].intersects(geometry)
            berths = areas_of_interest['berths'].intersects(geometry)
            trajectory_dataframe = pd.concat([trajectory_dataframe,pd.DataFrame(data=[[name,departure,arrival,origin,destination,distance,duration,draught,geometry,times,coordinates,sog,cog,speed,direction,acceleration,anchorage_areas,port_entrance,berths]],columns=trajectory_columns,index=[index])])
    return trajectory_dataframe

def change_data_format(df,data_columns):
    """ 
    Function that transforms the geometry data into string data in order to save it in dask

    Parameters
    ----------
    df: pandas dataframe with geometries
    data_columns: column names of geometries

    :returns: pandas dataframe
    """
    
    geometries = []
    changed_df = pd.DataFrame(columns=df.columns)
    for name in list(dict.fromkeys(df.name)):
        df_ship = df[df.name == name]
        for loc,info in df_ship.iterrows():
            for data_column in data_columns:
                if type(info[data_column]) == list or type(info[data_column]) == tuple:
                    for index,data in enumerate(info[data_column]):
                        if not index:
                            df_ship.loc[loc,data_column] = [str(data)]
                        else:
                            df_ship.loc[loc,data_column].append(str(data))
                else:
                    df_ship.loc[loc,data_column] = str(info[data_column])
        changed_df = pd.concat([changed_df,df_ship])
    return changed_df

In [None]:
#Load data and set the input values
ddf = dd.read_parquet(path_name+'/sorted_trajectories', storage_options={"account_name": "rwsais", "sas_token": sas_token})
trajectory_columns = ['name','departure','arrival','origin','destination','distance','duration','draught','geometry','times','coordinates','sog','cog','speed','direction','acceleration','anchorage_areas','port_entrance','berths']

In [None]:
#Performs the functions on the AIS data partitions and saves the data as parquet files in the Azure storage
ddf_i = ddf.partitions[:]
ddf_i = ddf_i.map_partitions(create_trajectory_dataframe,trajectory_columns=trajectory_columns,areas_of_interest=areas_of_interest,utm='EPSG:32631',meta=trajectory_dataframe)#,areas_of_interest,'EPSG:32631',meta=trajectory_dataframe)
ddf_i = ddf_i.map_partitions(merge_trajectories,gap=pd.Timedelta(minutes=1),distance=200,utm='EPSG:32631',meta=trajectory_dataframe)
ddf_i = ddf_i.map_partitions(change_data_format,data_columns=['origin','destination','geometry','coordinates'])
ddf_i = ddf_i.map_partitions(remame_vessels)
scheme_information = {'name': pa.string(),
                      'departure': pa.timestamp('ns', tz='UTC'),
                      'arrival': pa.timestamp('ns', tz='UTC'),
                      'origin': pa.string(),
                      'destination': pa.string(),
                      'distance': pa.float64(),
                      'duration': pa.duration('ns'),
                      'draught': pa.float64(),
                      'geometry': pa.string(),
                      'times': pa.list_(pa.timestamp('us', tz='UTC')),
                      'coordinates': pa.list_(pa.string()),
                      'sog': pa.list_(pa.float64()),
                      'cog': pa.list_(pa.float64()),
                      'speed': pa.list_(pa.float64()),
                      'direction': pa.list_(pa.float64()),
                      'acceleration': pa.list_(pa.float64()),
                      'anchorage_areas': pa.bool_(),
                      'port_entrance': pa.bool_(),
                      'berths': pa.bool_()}
ddf_i.to_parquet(path_name+'/merged_sorted_trajectories',storage_options={"account_name": "rwsais", "sas_token": sas_token},schema=scheme_information,engine='pyarrow')

### Isolate idle trajectories
Trajectories that were split based on the stop splitter were removed and should be restored for further analysis

In [None]:
#Functions
def create_conditions(df,start_date):
    """ 
    Function that finds the timeframes over which a vessel does not has AIS data that was stored in the
    sailing trajectories

    Parameters
    ----------
    df: pandas dataframe with geometries
    start_data: pandas timestamp that indicates the start date of the conditions

    :returns: dictionary that contains all the timeframes for each vessel
    """
    
    conditions = {}
    for name in list(dict.fromkeys(df.name)):
        df_ship = df[df.name == name]
        conditions[name] = []
        for index,info in df_ship.iterrows():
            conditions[name].append([start_date,pd.Timestamp(info.departure)]) #,tz='UTC'
            start_date = pd.Timestamp(info.arrival) #,tz='UTC'
    return conditions
    
def read_conditions(conditions):
    """ 
    Function that reads timeframe conditions for each vessel and stores it in a dataframe

    Parameters
    ----------
    conditions: dictionary that contains all the timeframes for each vessel

    :returns: pandas dataframe
    """
    
    read_conditions = pd.DataFrame(columns=['Name','Time_start','Time_stop'])
    for dictionary in conditions:
        ship_name = list(dictionary.keys())[0]
        df = pd.DataFrame(dictionary[ship_name],columns=['Time_start','Time_stop'])
        df['Name'] = ship_name
        read_conditions = pd.concat([read_conditions,df])
    read_conditions = read_conditions.reset_index(drop=True)
    return read_conditions
    
def find_untrajectorized_data(df,condition_df):
    """ 
    Function that selects the AIS data messages that fall within a conditional timeframe

    Parameters
    ----------
    df: pandas dataframe with AIS data
    condition_df: dataframe with the timeframe conditions for each ship

    :returns: pandas dataframe
    """
    
    selected_columns = list(df.columns)
    residual_df = pd.DataFrame(columns=selected_columns)
    for name in list(dict.fromkeys(df.name)):
        df_ship = df[df.name == name]
        if df_ship.empty:
            continue
        t_start = df_ship['timestamplast'].iloc[0]
        t_stop = df_ship['timestamplast'].iloc[-1]
        df_conditions = condition_df[((condition_df.Name ==name) & 
                                      (condition_df.Time_start > t_start) & 
                                      (condition_df.Time_start < t_stop))]
        for _,info in df_conditions.iterrows():
            residual_df_ship = df_ship[(df.timestamplast > info.Time_start) & (df.timestamplast < info.Time_stop)]
            if len(residual_df_ship):
                residual_df_ship['traj_id'] = str(info.Time_start)
                residual_df = pd.concat([residual_df,residual_df_ship])
    return residual_df

def create_idle_trajectories(df,crs):
    """ 
    Function that creates a trajectory dataframe with on each row a separate trajectory 

    Parameters
    ----------
    df: pandas dataframe with AIS data
    crs: coordinate reference system as a string in a 'EPSG:#'-format

    :returns: pandas dataframe with AIS data trajectories
    """
    
    import movingpandas as mpd
    def create_gdf(df):
        gdf = geopandas.GeoDataFrame(df,columns=df.columns,crs="EPSG:4326",geometry=geopandas.points_from_xy(df.longitude, df.latitude))
        return gdf
    
    def transform_projection(gdf,crs):
        transformed_gdf = gdf.to_crs(crs)
        return transformed_gdf
    
    def trajectorize(gdf,traj_id):
        traj = mpd.Trajectory(gdf,traj_id=traj_id,t='timestamplast',x='latitude',y='longitude')
        traj.add_speed(overwrite=True)
        traj.add_direction(overwrite=True)
        traj.add_acceleration(overwrite=True)
        return traj
    
    def traj_to_df(trajs):
        traj_df = pd.DataFrame(trajs.to_line_gdf())
        return traj_df
    
    column_names = df.columns
    gdf = create_gdf(df)
    transformed_gdf = transform_projection(gdf,crs)
    trajectories = []
    for traj_id in list(dict.fromkeys(df.traj_id)):
        sub_df = transformed_gdf[transformed_gdf.traj_id == traj_id]
        if len(sub_df.drop_duplicates('timestamplast')) >= 2:
            trajectory = trajectorize(sub_df,traj_id)
            trajectories.append(trajectory)
            
    traj_collection = mpd.TrajectoryCollection(trajectories)
    traj_collection_df = pd.DataFrame(columns=column_names)
    if traj_collection:
        traj_collection_df = traj_to_df(traj_collection)
        traj_collection_df = traj_collection_df.rename(columns={"t": "timestamplast"}) 
        traj_collection_df = traj_collection_df.reindex(columns=column_names)
    return traj_collection_df

In [None]:
#Load data
ddf = dd.read_parquet(path_name+'/merged_sorted_trajectories',storage_options={"account_name": "rwsais", "sas_token": sas_token})

In [None]:
#Performs the functions on the AIS data partitions and compute to get the result
ddf_i = ddf.partitions[:]
ddf_i = ddf_i.map_partitions(create_conditions,start_date=pd.Timestamp('2022-01-01 00:00:00+0000', tz='UTC'))
conditions = ddf_i.compute()
condition_df = read_conditions(conditions)

In [None]:
#Load data
ddf = dd.read_parquet(path_name+'/selected_vessels_for_further_analysis', storage_options={"account_name": "rwsais", "sas_token": sas_token})

In [None]:
#Performs the functions on the AIS data partitions and saves the data as parquet files in the Azure storage
ddf_i = ddf.partitions[:]
ddf_i = ddf_i.map_partitions(sort_values,['name','timestamplast'])
ddf_i = ddf_i.map_partitions(add_columns,added_columns=['direction', 'prev_t', 'speed', 'acceleration', 'traj_id', 'geometry'])
ddf_i = ddf_i.map_partitions(find_untrajectorized_data,condition_df=condition_df)
ddf_i = ddf_i.map_partitions(create_trajectories,crs='EPSG:32631',meta=ddf_i)
ddf_i['traj_id']=ddf_i['traj_id'].astype(str)
ddf_i = ddf_i.map_partitions(trajectories_in_areas_of_interest,areas_of_interest)
scheme_information = {'vesseltype': pa.int64(),
                      'hazardouscargo': pa.int64(),
                      'length': pa.float64(),
                      'width': pa.float64(),
                      'draughtMarine': pa.float64(),
                      'timestamplast': pa.timestamp('ns', tz='UTC'),
                      'longitude': pa.float64(),
                      'latitude': pa.float64(),
                      'sog': pa.float64(),
                      'heading': pa.float64(),
                      'speed': pa.float64(),
                      'direction': pa.float64(),
                      'acceleration': pa.float64()}
for key in areas_of_interest.keys():
    scheme_information[key] = pa.bool_()
ddf_i.to_parquet(path_name+'/idle_trajectories', storage_options={"account_name": "rwsais", "sas_token": sas_token},write_index=False,schema=scheme_information,engine='pyarrow')

### Sort idle trajectories

In [None]:
cluster.scale(n=5)

In [None]:
ddf = dd.read_parquet(path_name+'/idle_trajectories', storage_options={"account_name": "rwsais", "sas_token": sas_token})
ddf_i = ddf.set_index('name')
ddf_i = ddf_i.map_partitions(reset_index)
ddf_i = ddf_i.map_partitions(sort_values,['name','timestamplast'])
ddf_i.to_parquet(path_name+'/sorted_idle_trajectories', storage_options={"account_name": "rwsais", "sas_token": sas_token})
cluster.adapt(minimum=1, maximum=100)

### Merge idle trajectories

In [None]:
ddf = dd.read_parquet(path_name+'/sorted_idle_trajectories', storage_options={"account_name": "rwsais", "sas_token": sas_token})

In [None]:
df = ddf.partitions[0][ddf.partitions[0].name == 'testschip-10'].compute()

In [None]:
ddf = dd.read_parquet(path_name+'/sorted_idle_trajectories', storage_options={"account_name": "rwsais", "sas_token": sas_token})
ddf_i = ddf.partitions[:]
ddf_i = ddf_i.map_partitions(create_trajectory_dataframe,trajectory_dataframe=trajectory_dataframe,areas_of_interest=areas_of_interest,utm='EPSG:32631',meta=trajectory_dataframe)#,areas_of_interest,'EPSG:32631',meta=trajectory_dataframe)
ddf_i = ddf_i.map_partitions(merge_trajectories,gap=pd.Timedelta(minutes=1),distance=200,utm='EPSG:32631',meta=trajectory_dataframe)
ddf_i = ddf_i.map_partitions(change_data_format,data_columns=['origin','destination','geometry','coordinates'])
scheme_information = {'name': pa.string(),
                      'departure': pa.timestamp('ns', tz='UTC'),
                      'arrival': pa.timestamp('ns', tz='UTC'),
                      'origin': pa.string(),
                      'destination': pa.string(),
                      'distance': pa.float64(),
                      'duration': pa.duration('ns'),
                      'draught': pa.float64(),
                      'geometry': pa.string(),
                      'times': pa.list_(pa.timestamp('us', tz='UTC')),
                      'coordinates': pa.list_(pa.string()),
                      'sog': pa.list_(pa.float64()),
                      'cog': pa.list_(pa.float64()),
                      'speed': pa.list_(pa.float64()),
                      'direction': pa.list_(pa.float64()),
                      'acceleration': pa.list_(pa.float64()),
                      'anchorage_areas': pa.bool_(),
                      'port_entrance': pa.bool_(),
                      'berths': pa.bool_()}
ddf_i.to_parquet(path_name+'/merged_sorted_idle_trajectories',storage_options={"account_name": "rwsais", "sas_token": sas_token},schema=scheme_information,engine='pyarrow')

### Merge sailing and idle trajectories

In [None]:
trajectories_ddf = dd.read_parquet(path_name+'/merged_sorted_trajectories', storage_options={"account_name": "rwsais", "sas_token": sas_token})
residual_ddf = dd.read_parquet(path_name+'/merged_sorted_idle_trajectories', storage_options={"account_name": "rwsais", "sas_token": sas_token})

In [None]:
final_ddf = dd.concat([trajectories_ddf,residual_ddf])

In [None]:
scheme_information = {'name': pa.string(),
                      'departure': pa.timestamp('ns', tz='UTC'),
                      'arrival': pa.timestamp('ns', tz='UTC'),
                      'origin': pa.string(),
                      'destination': pa.string(),
                      'distance': pa.float64(),
                      'duration': pa.duration('ns'),
                      'draught': pa.float64(),
                      'geometry': pa.string(),
                      'times': pa.list_(pa.timestamp('ns', tz='UTC')),
                      'coordinates': pa.list_(pa.string()),
                      'sog': pa.list_(pa.float64()),
                      'cog': pa.list_(pa.float64()),
                      'speed': pa.list_(pa.float64()),
                      'direction': pa.list_(pa.float64()),
                      'acceleration': pa.list_(pa.float64()),
                      'anchorage_areas': pa.bool_(),
                      'port_entrance': pa.bool_(),
                      'berths': pa.bool_()}
final_ddf.to_parquet(path_name+'/all_merged_trajectories',storage_options={"account_name": "rwsais", "sas_token": sas_token},schema=scheme_information,engine='pyarrow')

### Sort trajectories and compact

In [None]:
final_ddf = dd.read_parquet(path_name+'/all_merged_trajectories', storage_options={"account_name": "rwsais", "sas_token": sas_token})

In [None]:
final_ddf_i = final_ddf.partitions[:]
final_ddf_i = final_ddf_i.map_partitions(reset_index)
final_ddf_i = final_ddf_i.map_partitions(sort_values,['name','departure'])
scheme_information = {'name': pa.string(),
                      'departure': pa.timestamp('ns', tz='UTC'),
                      'arrival': pa.timestamp('ns', tz='UTC'),
                      'origin': pa.string(),
                      'destination': pa.string(),
                      'distance': pa.float64(),
                      'duration': pa.duration('ns'),
                      'draught': pa.float64(),
                      'geometry': pa.string(),
                      'times': pa.list_(pa.timestamp('us', tz='UTC')),
                      'coordinates': pa.list_(pa.string()),
                      'sog': pa.list_(pa.float64()),
                      'cog': pa.list_(pa.float64()),
                      'speed': pa.list_(pa.float64()),
                      'direction': pa.list_(pa.float64()),
                      'acceleration': pa.list_(pa.float64()),
                      'anchorage_areas': pa.bool_(),
                      'port_entrance': pa.bool_(),
                      'berths': pa.bool_()}
final_ddf.to_parquet(path_name+'/all_merged_sorted_trajectories',storage_options={"account_name": "rwsais", "sas_token": sas_token},schema=scheme_information,engine='pyarrow')

In [None]:
ddf = dd.read_parquet(path_name+'/all_merged_sorted_trajectories', storage_options={"account_name": "rwsais", "sas_token": sas_token})
ddf_i = ddf.partitions[:]
ddf_i = ddf_i.repartition(npartitions=50)
scheme_information = {'name': pa.string(),
                      'departure': pa.timestamp('ns', tz='UTC'),
                      'arrival': pa.timestamp('ns', tz='UTC'),
                      'origin': pa.string(),
                      'destination': pa.string(),
                      'distance': pa.float64(),
                      'duration': pa.duration('ns'),
                      'draught': pa.float64(),
                      'geometry': pa.string(),
                      'times': pa.list_(pa.timestamp('us', tz='UTC')),
                      'coordinates': pa.list_(pa.string()),
                      'sog': pa.list_(pa.float64()),
                      'cog': pa.list_(pa.float64()),
                      'speed': pa.list_(pa.float64()),
                      'direction': pa.list_(pa.float64()),
                      'acceleration': pa.list_(pa.float64()),
                      'anchorage_areas': pa.bool_(),
                      'port_entrance': pa.bool_(),
                      'berths': pa.bool_()}
ddf_i.to_parquet(path_name+'/all_merged_sorted_trajectories_comprised', storage_options={"account_name": "rwsais", "sas_token": sas_token}, schema=scheme_information, engine='pyarrow')