In [1]:
import pandas as pd
from geopandas import GeoDataFrame, read_file
import sys
sys.path.append("..")
import movingpandas as mpd
mpd.show_versions()
import warnings
warnings.simplefilter("ignore")

Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.




MovingPandas 0.17.2

SYSTEM INFO
-----------
python     : 3.9.19 | packaged by conda-forge | (main, Mar 20 2024, 12:50:21)  [GCC 12.3.0]
executable : /home/arg/miniconda3/envs/movingpandas/bin/python
machine    : Linux-5.15.0-105-generic-x86_64-with-glibc2.31

GEOS, GDAL, PROJ INFO
---------------------
GEOS       : None
GEOS lib   : None
GDAL       : 3.8.4
GDAL data dir: /home/arg/miniconda3/envs/movingpandas/share/gdal
PROJ       : 9.3.1
PROJ data dir: /home/arg/miniconda3/envs/movingpandas/share/proj

PYTHON DEPENDENCIES
-------------------
geopandas  : 0.14.3
pandas     : 2.2.1
fiona      : 1.9.6
numpy      : 1.26.4
shapely    : 2.0.3
rtree      : 1.2.0
pyproj     : 3.6.1
matplotlib : 3.8.3
mapclassify: 2.6.1
geopy      : 2.4.1
holoviews  : 1.18.3
hvplot     : 0.9.2
geoviews   : 1.11.1
stonesoup  : 1.2


In [None]:
import csv
import os
import shutil
import xml.etree.ElementTree as ET
from datetime import timedelta
import numpy as np
from dateutil import parser

global trajectory_id
def generate_grid_and_path(x_min, x_max, x_n, y_min, y_max, y_n):
    """
    Generates a grid and a path based on the given parameters.

    Parameters:
    - x_min (float): The minimum x-coordinate value.
    - x_max (float): The maximum x-coordinate value.
    - x_n (int): The number of points in the x-direction.
    - y_min (float): The minimum y-coordinate value.
    - y_max (float): The maximum y-coordinate value.
    - y_n (int): The number of points in the y-direction.

    Returns:
    - path (list): A list of coordinate tuples representing the path.

    """
    x_values = np.linspace(x_min, x_max, x_n)
    y_values = np.linspace(y_min, y_max, y_n)
    X, Y = np.meshgrid(x_values, y_values)
    path = []

    for i in range(x_n):
        for j in range(y_n):
            if i % 2 == 0:
                path.append((X[j, i], Y[j, i]))
            else:
                path.append((X[-j - 1, i], Y[-j - 1, i]))

    if x_n % 2 == 0:
        for j in range(y_n):
            for i in range(x_n):
                if j % 2 == 0:
                    path.append((X[j, i], Y[j, i]))
                else:
                    path.append((X[j, -i - 1], Y[j, -i - 1]))
    else:
        for j in range(y_n - 1, -1, -1):
            for i in range(x_n):
                if j % 2 == 0:
                    path.append((X[j, i], Y[j, i]))
                else:
                    path.append((X[j, -i - 1], Y[j, -i - 1]))
    return path


def generate_circle_path(x_center, y_center, radius, num_points):
    """
    Generate a circular path.

    Parameters:
    - x_center (float): x-coordinate of the center of the circle.
    - y_center (float): y-coordinate of the center of the circle.
    - radius (float): radius of the circle.
    - num_points (int): number of points to generate on the circle.

    Returns:
    - path (list): list of (x, y) coordinates representing the circular path.
    """
    theta_values = np.linspace(0, 2*np.pi, num_points)
    path = [(x_center + radius * np.cos(theta), y_center + radius * np.sin(theta)) for theta in theta_values]
    return path

def parse_corrdinates_str_list(kml_path):
    """
    Parses a KML file and extracts the coordinates from the LineString elements.

    Args:
        kml_path (str): The path to the KML file.

    Returns:
        list: A list of coordinate strings extracted from the LineString elements.
    """
    tree = ET.parse(kml_path)
    root = tree.getroot()
    coordinates = None
    coordinates_str_list = []
    for placemark in root.findall(".//Placemark"):
        name = placemark.find("name")
        if name is not None :
            # Find the LineString element
            line_string = placemark.find(".//LineString")
            if line_string is not None:
                # Extract the coordinates
                coordinates = line_string.find("coordinates")
                if coordinates is not None:
                    coordinates_str_list.append(coordinates.text.strip())
    return coordinates_str_list

def get_means(coordinates):
    """
    Calculate the mean x and y coordinates from a list of coordinates.

    Parameters:
    coordinates (list): A list of coordinate tuples, where each tuple contains x and y coordinates.

    Returns:
    tuple: A tuple containing the mean x and y coordinates.

    """
    mean_x = np.mean([coord[0] for coord in coordinates])
    mean_y = np.mean([coord[1] for coord in coordinates])
    return mean_x, mean_y


def write_to_csv(
    csv_file_path, coordinates, trajectory_id, tracker, base_timestamp, mean_x, mean_y, grid_size_half=10e-5, xy_n=10
):
    """
    Writes coordinates to a CSV file.

    Args:
        csv_file_path (str): The path to the CSV file.
        coordinates (list): A list of coordinate tuples (x, y, z).
        trajectory_id (int): The ID of the trajectory.
        tracker (str): The tracker name.
        base_timestamp (str): The base timestamp for generating timestamps.
        mean_x (float): The mean x-coordinate.
        mean_y (float): The mean y-coordinate.
        grid_size_half (float, optional): Half of the grid size. Defaults to 10e-5.
        xy_n (int, optional): The number of points in the x and y directions. Defaults to 10.
    """
    base_time = parser.parse(base_timestamp)
    with open(csv_file_path, "a", newline="") as csv_file:
        writer = csv.writer(csv_file, delimiter=";")

        path = generate_circle_path(mean_x, mean_y, 0.00005, 10)

        for i, coord in enumerate(path, start=len(coordinates) + 1):
            x, y = coord
            timestamp = (base_time + timedelta(seconds=1 * (i - 1))).isoformat()
            row = [x, y, i, i, i, trajectory_id, tracker, timestamp]
            writer.writerow(row)


def convert_kml_to_csv(kml_path):
    """
    Converts a KML file to a CSV file.

    Args:
        kml_path (str): The path to the KML file.

    Returns:
        None
    """
    global trajectory_id
    global count
    kml_name = kml_path.split("/")[-1].split(".")[0]

    #print(parse_corrdinates_str_list(kml_path))
    coordinates_str_list = parse_corrdinates_str_list(kml_path)[0].split()
    #check if coord can convert to float
    coordinates = [[(coord) for coord in coord_str.split(",")] for coord_str in coordinates_str_list]
    print(kml_name, count)
    count += 1
        
    coordinates = [[float(coord) for coord in coord_str.split(",")] for coord_str in coordinates_str_list]
    mean_x, mean_y = get_means(coordinates)
    #print(f"Min x: {min([coord[0] for coord in coordinates])}, Min y: {min([coord[1] for coord in coordinates])}")
    #print(f"Max x: {max([coord[0] for coord in coordinates])}, Max y: {max([coord[1] for coord in coordinates])}")
    xy_n = 61
    grid_size_half = 1 / 111_111 * xy_n # Convert to meter

    #print(f"Mean x: {mean_x}, Mean y: {mean_y}")

    #trajectory_id = 1
    #trajectory_id = trajectory_id + 1  ## 
    tracker = 19
    base_timestamp = "2008-12-11 04:42:14+00"

    #csv_file_path = f"./data/{kml_name}.csv"
    csv_file_path = f"./data/ALL.csv"
    
    write_to_csv(
        csv_file_path,
        coordinates,
        trajectory_id,
        tracker,
        base_timestamp,
        mean_x,
        mean_y,
        grid_size_half,
        xy_n=xy_n,
    )
    trajectory_id = trajectory_id + 1
    # shutil.copy(csv_file_path, os.path.expanduser("./data/"))
    #print(f"Copied to ./data/{kml_name}.csv")

def get_specific_filenames(directory, extension):
    """
    Get a list of filenames in a directory that have a specific extension.

    Parameters:
    directory (str): The directory path to search for filenames.
    extension (str): The specific file extension to filter filenames.

    Returns:
    list: A list of filenames that have the specified extension.
    """
    filenames = [filename for filename in os.listdir(directory) if filename.endswith(extension)]
    return filenames

os.remove(f"./data/ALL.csv")


directory_path = "./data/raw_kml"
file_extension = ".kml"

# get all kml files in the directory
kml_pathes = get_specific_filenames(directory_path, file_extension)

kml_pathes = [f"./data/raw_kml/{kml_path}" for kml_path in kml_pathes]
trajectory_id = 1
count = 0
csv_file = open(f"./data/ALL.csv", "a", newline="")
writer = csv.writer(csv_file, delimiter=";")
headers = ["X", "Y", "fid", "id", "sequence", "trajectory_id", "tracker", "t"]
writer.writerow(headers)
csv_file.close()
for kml_path in kml_pathes:
    convert_kml_to_csv(kml_path)

In [4]:
import geoviews as gv
bing_maps_tile_source = gv.tile_sources.WMTS("http://ecn.t3.tiles.virtualearth.net/tiles/a{q}.jpeg?g=1")
hvplot_defaults = {'tiles':bing_maps_tile_source, 'frame_height':700, 'frame_width':700, 'colorbar':True}

In [5]:
%%time 
df = pd.read_csv('data/ALL.csv', delimiter=';')
traj_collection = mpd.TrajectoryCollection(df, 'trajectory_id', t='t', x='X', y='Y')


CPU times: user 25.3 s, sys: 672 ms, total: 26 s
Wall time: 26 s


In [None]:
traj_collection.hvplot(title=str(traj_collection), line_width=[5,.8], **hvplot_defaults)

![alt text](./img/6000.png)
