In [None]:
import pandas as pd
import numpy as np
from geopy.distance import geodesic
from tqdm import tqdm

df = pd.read_parquet("../data/filtered_waypoints.parquet") # original data that ive shown to infeo
# df = pd.read_parquet("../data/filtered_waypoints_new.parquet")
# df = pd.read_parquet("../../Data_Preperation/filtered_data_full_1.parquet")

df = df.sort_values(by=["id_tracking", "sequence"]).reset_index(drop=True)

grouped = df.groupby("id_tracking")

print(len(grouped))

print(grouped.head())

features = []

for tracking_id, group in tqdm(grouped, desc="Extracting features"):
    group = group.dropna(subset=["latitude", "longitude"])
    coords = list(zip(group["latitude"], group["longitude"]))

    if len(coords) < 2:
        continue

    num_points = len(coords)

    lat_span = max(group["latitude"]) - min(group["latitude"])
    lon_span = max(group["longitude"]) - min(group["longitude"])
    bbox_area = lat_span * lon_span

    point_density = num_points / (bbox_area + 1e-6)

    dists = [geodesic(coords[i], coords[i+1]).meters for i in range(len(coords)-1)]
    avg_segment_distance = np.mean(dists)

    num_stops = (group["speed"] == 0).sum()

    # duration = group["duration"].iloc[0] / 1e9
    # length = group["length"].iloc[0]

    if "tracking_duration" in group.columns:
        duration = max(group["tracking_duration"])
    if "tracking_length" in group.columns:
        length = max(group["tracking_length"])

    features.append({
        "tracking_id": tracking_id,
        "num_points": num_points,
        "bbox_area": bbox_area,
        "point_density": point_density,
        "avg_segment_distance": avg_segment_distance,
        "num_stops": num_stops,
        "duration": duration,
        "length": length
    })

features_df = pd.DataFrame(features)
features_df.to_csv("tracking_features.csv", index=False)
print("Feature extraction complete. Saved to tracking_features.csv")


: 

In [1]:
import pandas as pd
import numpy as np
from geopy.distance import geodesic
from tqdm import tqdm
import csv

parquet_file = pd.read_parquet("../../Data_Preperation/filtered_data_full_1.parquet")


with open("tracking_features_full.csv", "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=[
        "tracking_id", "num_points", "bbox_area", "point_density",
        "avg_segment_distance", "num_stops", "duration", "length"
    ])
    writer.writeheader()

    for batch in parquet_file.iter_batches(batch_size=100_000):
        df = batch.to_pandas()
        df = df.sort_values(by=["id_tracking", "sequence"])

        for tracking_id, group in df.groupby("id_tracking"):
            group = group.dropna(subset=["latitude", "longitude"])
            coords = list(zip(group["latitude"], group["longitude"]))
            if len(coords) < 2:
                continue

            lat_span = max(group["latitude"]) - min(group["latitude"])
            lon_span = max(group["longitude"]) - min(group["longitude"])
            bbox_area = lat_span * lon_span
            point_density = len(coords) / (bbox_area + 1e-6)
            dists = [geodesic(coords[i], coords[i+1]).meters for i in range(len(coords)-1)]
            avg_segment_distance = np.mean(dists)
            num_stops = (group["speed"] == 0).sum()
            duration = max(group["tracking_duration"]) if "tracking_duration" in group.columns else None
            length = max(group["tracking_length"]) if "tracking_length" in group.columns else None

            writer.writerow({
                "tracking_id": tracking_id,
                "num_points": len(coords),
                "bbox_area": bbox_area,
                "point_density": point_density,
                "avg_segment_distance": avg_segment_distance,
                "num_stops": num_stops,
                "duration": duration,
                "length": length
            })


: 

In [1]:
import pandas as pd
import dask.dataframe as dd
import numpy as np
from geopy.distance import geodesic
from dask.diagnostics import ProgressBar

# Define a function that processes a single group (one tracking_id)
# Dask will apply this function to each group without loading them all into memory at once.
def extract_features_for_group(group):
    # The 'group' that Dask passes to this function is a regular pandas DataFrame
    group = group.sort_values(by="sequence") # Ensure order within the group
    group = group.dropna(subset=["latitude", "longitude"])
    coords = list(zip(group["latitude"], group["longitude"]))

    if len(coords) < 2:
        return None # Dask will filter this out

    num_points = len(coords)

    # --- Your feature calculation logic (unchanged) ---
    lat_span = group["latitude"].max() - group["latitude"].min()
    lon_span = group["longitude"].max() - group["longitude"].min()
    bbox_area = lat_span * lon_span
    point_density = num_points / (bbox_area + 1e-6)
    
    # Use np.mean on a list comprehension for efficiency
    dists = [geodesic(coords[i], coords[i+1]).meters for i in range(len(coords)-1)]
    avg_segment_distance = np.mean(dists) if dists else 0

    num_stops = (group["speed"] == 0).sum()

    # Get tracking-level duration and length
    duration = group["tracking_duration"].max()
    length = group["tracking_length"].max()
    
    # Return a pandas Series
    return pd.Series({
        "num_points": num_points,
        "bbox_area": bbox_area,
        "point_density": point_density,
        "avg_segment_distance": avg_segment_distance,
        "num_stops": num_stops,
        "duration": duration,
        "length": length
    })

# --- Main script execution ---

# 1. Read the Parquet file with Dask. This happens instantly and uses almost no memory.
#    Specify only the columns you need to further reduce memory usage.
columns_needed = [
    "id_tracking", "sequence", "latitude", "longitude", 
    "speed", "tracking_duration", "tracking_length"
]
df = dd.read_parquet(
    "../../Data_Preperation/filtered_data_full_1.parquet", 
    columns=columns_needed
)

# 2. Group by tracking ID
grouped = df.groupby("id_tracking")

# 3. Define the structure of the output (Dask needs this beforehand)
meta = pd.Series(
    [0, 0.0, 0.0, 0.0, 0, 0, 0.0],
    index=["num_points", "bbox_area", "point_density", "avg_segment_distance", "num_stops", "duration", "length"],
    dtype="float64"
)

# 4. Apply the function to each group and trigger the computation
print("Starting feature extraction with Dask...")
with ProgressBar():
    features_df = grouped.apply(extract_features_for_group, meta=meta).compute()

# 5. Save the final result (now a regular pandas DataFrame) to CSV
features_df.to_csv("tracking_features_new_test_gemini_1.csv", index_label="tracking_id")
print("\nFeature extraction complete. Saved to tracking_features.csv")

Starting feature extraction with Dask...
[################################        ] | 80% Completed | 87m 0sss

: 