In [1]:
import geopandas as gp
import movingpandas as mpd
from datetime import timedelta
from pyproj import CRS
import pandas as pd
from tqdm import tqdm
from joblib import Parallel, delayed
import logging
from shapely import Point
from datetime import datetime
logging.basicConfig(filename='log_read_freemove.log', level=logging.DEBUG)

In [2]:
print('Reading data...')
raw_points_gdf = gp.read_file('../data/freemove/raw_full.geojson', crs='EPSG:4326')
print('Data read.')

Reading data...
Data read.


In [3]:
# create geojson of points
rows = []
for i, row in tqdm(raw_points_gdf.iterrows(), total=len(raw_points_gdf), miniters=10):
    person_id = row.PERSON_ID
    trip_id = row.TRIP_ID
    leg_id = row.LEG_ID
    leg_start = datetime.strptime(str(row.LEG_START), '%Y-%m-%dT%H:%M:%S')
    leg_end = datetime.strptime(str(row.LEG_END), '%Y-%m-%dT%H:%M:%S')
    trip_purpose_ids = row.TRIP_PURPOSE_IDS
    traffic_mode = row.TRAFFIC_MODE
    leg_len_in_mtrs = row.LEG_LEN_IN_MTRS
    leg_duration_in_secs = row.LEG_DURATION_IN_SECS

    # interpolate time of points of a leg since we only have start and end time
    total_points_of_leg = len(row.geometry.coords)
    interpolattion_increment = leg_duration_in_secs / total_points_of_leg

    for point_i, point in enumerate(row.geometry.coords):
        new_row = {"PERSON_ID": person_id, 
                   "TRIP_ID": trip_id, 
                   "LEG_ID": leg_id, 
                   "LEG_START": leg_start, 
                   "LEG_END": leg_end,
                   "TIME": leg_start + timedelta(seconds=round(interpolattion_increment * point_i, ndigits=0)),
                   "TRIP_PURPOSE_IDS": trip_purpose_ids, 
                   "TRAFFIC_MODE": traffic_mode, 
                   "LEG_LEN_IN_MTRS": leg_len_in_mtrs, 
                   "LEG_DURATION_IN_SECS": leg_duration_in_secs, 
                   "LAT": Point(point).y,
                   "LON": Point(point).x}
        rows.append(new_row)


raw_points_gdf = pd.DataFrame(rows)[["PERSON_ID", "TRIP_ID", "TIME", "LAT", "LON"]]
raw_points_gdf

100%|██████████████████████████████████████████████████████████████████████████████| 4958/4958 [00:42<00:00, 115.99it/s]


Unnamed: 0,PERSON_ID,TRIP_ID,TIME,LAT,LON
0,16348,985222,2022-10-31 09:15:57,52.454451,13.504967
1,16348,985222,2022-10-31 09:15:58,52.454463,13.504983
2,16348,985222,2022-10-31 09:15:59,52.454454,13.505009
3,16348,985222,2022-10-31 09:16:00,52.454422,13.505042
4,16348,985222,2022-10-31 09:16:01,52.454391,13.505098
...,...,...,...,...,...
1637556,17273,1012443,2022-11-15 10:00:46,52.501937,13.355693
1637557,17273,1012443,2022-11-15 10:00:47,52.501903,13.355708
1637558,17273,1012443,2022-11-15 10:00:48,52.501874,13.355723
1637559,17273,1012443,2022-11-15 10:00:49,52.501847,13.355738


In [4]:
# rename columns to match geolife point dataset
raw_points_gdf = raw_points_gdf.rename(columns={'TRIP_ID': 'traj_id', 
                                                'TIME': 'time', 
                                                'LON': 'lon',
                                                'LAT': 'lat',
                                                'PERSON_ID': 'user'}).reset_index(drop=True)

In [5]:
def write_geojson(gdf, path):
    assert isinstance(gdf, gp.GeoDataFrame)
    gdf.to_file(path, driver='GeoJSON')



In [6]:
print('removing points outside valid lon and lat range...')
# Remove points that fall outside the valid lon and lat range (-90 to 90 for latitude and -180 to 180 for longitude)
raw_points_gdf = raw_points_gdf[(raw_points_gdf.lat >= -90) & (raw_points_gdf.lat <= 90) & (raw_points_gdf.lon >= -180) & (raw_points_gdf.lon <= 180)]
print('points removed.')

removing points outside valid lon and lat range...
points removed.


In [7]:
print('Dropping NA values...')
len_before = len(raw_points_gdf)
logging.info('Rows before dropping NA values: {}'.format(len(raw_points_gdf)))
raw_points_gdf.dropna(inplace=True)
logging.info('Rows after dropping NA values: {}'.format(len(raw_points_gdf)))
logging.info('Rows dropped: {}'.format(len_before - len(raw_points_gdf)))
print('NA values dropped.')

Dropping NA values...
NA values dropped.


In [8]:
print('creating trajectory collection...')
# Create trajectory collection
raw_full_trip_collection = mpd.TrajectoryCollection(raw_points_gdf, traj_id_col='traj_id', obj_id_col ='user', t='time', x='lon', y='lat')
print('trajectory collection created.')

logging.info(f'This is a test log of traj id: {raw_full_trip_collection.trajectories[0].id}')
logging.info(f'Number of trajectories in data: {len(raw_full_trip_collection.trajectories)}')

creating trajectory collection...
trajectory collection created.


In [9]:
# Convert to EPSG
def convert_epsg(traj):
    result = traj.to_crs(CRS(3035))
    result.obj_id = traj.obj_id
    return result
print('converting to EPSG:3035...')
raw_full_trip_collection.trajectories = Parallel(n_jobs=-2, verbose=10)(delayed(convert_epsg)(traj) for traj in raw_full_trip_collection.trajectories)
print('converted to EPSG:3035 (Berlin).')

converting to EPSG:3035...


[Parallel(n_jobs=-2)]: Using backend LokyBackend with 127 concurrent workers.
[Parallel(n_jobs=-2)]: Done  11 tasks      | elapsed:    5.3s
[Parallel(n_jobs=-2)]: Done  34 tasks      | elapsed:    5.9s
[Parallel(n_jobs=-2)]: Done  59 tasks      | elapsed:    6.6s
[Parallel(n_jobs=-2)]: Done  84 tasks      | elapsed:    7.3s
[Parallel(n_jobs=-2)]: Done 111 tasks      | elapsed:    8.4s
[Parallel(n_jobs=-2)]: Done 138 tasks      | elapsed:    9.0s
[Parallel(n_jobs=-2)]: Done 167 tasks      | elapsed:    9.9s
[Parallel(n_jobs=-2)]: Done 196 tasks      | elapsed:   10.9s
[Parallel(n_jobs=-2)]: Done 227 tasks      | elapsed:   11.5s
[Parallel(n_jobs=-2)]: Done 258 tasks      | elapsed:   12.1s
[Parallel(n_jobs=-2)]: Done 291 tasks      | elapsed:   12.9s
[Parallel(n_jobs=-2)]: Done 324 tasks      | elapsed:   13.6s
[Parallel(n_jobs=-2)]: Done 359 tasks      | elapsed:   14.2s
[Parallel(n_jobs=-2)]: Done 394 tasks      | elapsed:   15.0s
[Parallel(n_jobs=-2)]: Done 431 tasks      | elapsed: 

converted to EPSG:3035 (Berlin).


In [10]:
# Split trajectories

def split_traj(traj, MAX_DIAMETER=100, MIN_DURATION=timedelta(minutes=15), MIN_LENGTH=500):
    try:
        split = mpd.StopSplitter(traj).split(max_diameter=MAX_DIAMETER, min_duration=MIN_DURATION, min_length=MIN_LENGTH)
        for i in range(len(split.trajectories)):
            split.trajectories[i].obj_id = traj.obj_id
        return split.trajectories
    except BaseException as e:
        print(e, 'Error at traj: ', traj.id)
        logging.warning(f'{e} Error at traj: {traj.id}')

        return []

print('splitting trajectories...')
# split_trajs = Parallel(n_jobs=4, verbose=10)(delayed(split_traj)(traj) for traj in geolife_raw_collection.trajectories)
split_trajs = []
for traj in tqdm(raw_full_trip_collection.trajectories):
    try:
        split_trajs.append(split_traj(traj))
    except BaseException as e:
        print(e, 'Error at traj: ', traj.id)
        continue
    
split_trajs = [traj for sublist in split_trajs for traj in sublist]
freemove_splitted_collection = mpd.TrajectoryCollection(split_trajs)
print('trajectories split.')

splitting trajectories...


100%|█████████████████████████████████████████████████████████████████████████████| 1408/1408 [1:21:19<00:00,  3.47s/it]


trajectories split.


In [19]:
# Smooth trajectories
def smooth_traj(traj, PROCESS_NOISE_STD=0.1, MEASUREMENT_NOISE_STD=10):
    try:
        result = mpd.KalmanSmootherCV(traj).smooth(process_noise_std=PROCESS_NOISE_STD, measurement_noise_std=MEASUREMENT_NOISE_STD)
        result.obj_id = traj.obj_id
        return result
    except BaseException as e:
        print(e, 'Error at traj: ', traj.id)
        logging.warning(f'{e} Error at traj: {traj.id}')
        return traj

print('smoothing trajectories...')
# geolife_splitted_smooth_collection = Parallel(n_jobs=4, verbose=10)(delayed(smooth_traj)(traj) for traj in geolife_splitted_collection.trajectories)
freemove_smooth_collection = []
for traj in tqdm(raw_full_trip_collection.trajectories):
    try:
        freemove_smooth_collection.append(smooth_traj(traj))
    except BaseException as e:
        print(e, 'Error at traj: ', traj.id)
        continue

freemove_smooth_collection = mpd.TrajectoryCollection(freemove_smooth_collection)
print('trajectories smoothed.')

smoothing trajectories...


100%|████████████████████████████████████████████████████████████████████████████| 1408/1408 [00:00<00:00, 31458.84it/s]

module 'movingpandas' has no attribute 'KalmanSmootherCV' Error at traj:  978933
module 'movingpandas' has no attribute 'KalmanSmootherCV' Error at traj:  978934
module 'movingpandas' has no attribute 'KalmanSmootherCV' Error at traj:  978946
module 'movingpandas' has no attribute 'KalmanSmootherCV' Error at traj:  978955
module 'movingpandas' has no attribute 'KalmanSmootherCV' Error at traj:  980322
module 'movingpandas' has no attribute 'KalmanSmootherCV' Error at traj:  985011
module 'movingpandas' has no attribute 'KalmanSmootherCV' Error at traj:  985073
module 'movingpandas' has no attribute 'KalmanSmootherCV' Error at traj:  985091
module 'movingpandas' has no attribute 'KalmanSmootherCV' Error at traj:  985112
module 'movingpandas' has no attribute 'KalmanSmootherCV' Error at traj:  985118
module 'movingpandas' has no attribute 'KalmanSmootherCV' Error at traj:  985127
module 'movingpandas' has no attribute 'KalmanSmootherCV' Error at traj:  985179
module 'movingpandas' has no




KeyboardInterrupt: 

In [12]:
# Generalize trajectories
def generalize_traj(traj, TOLERANCE=1.0):
    try:
        result = mpd.DouglasPeuckerGeneralizer(traj).generalize(tolerance=TOLERANCE)
        result.obj_id = traj.obj_id
        return result
    except BaseException as e:
        print(e, 'Error at traj: ', traj.id)
        logging.warning(f'{e} Error at traj: {traj.id}')
        return traj

# Douglas-Peucker generalization for non-smoothed trajectories
print('generalizing trajectories... (1)')
logging.info('generalizing trajectories... (1)')
# geolife_splitted_generalized_collection = Parallel(n_jobs=4, verbose=10)(delayed(generalize_traj)(traj) for traj in geolife_splitted_collection.trajectories)
freemove_generalized_collection = []
for traj in tqdm(raw_full_trip_collection.trajectories):
    try:
        freemove_generalized_collection.append(generalize_traj(traj))
    except BaseException as e:
        print(e, 'Error at traj: ', traj.id)
        continue
freemove_generalized_collection = mpd.TrajectoryCollection(freemove_generalized_collection)
print('trajectories generalized. (1)')
logging.info('trajectories generalized. (1)')

generalizing trajectories... (1)


 66%|█████████████████████████████████████████████████████▏                          | 936/1408 [03:33<02:07,  3.71it/s]

 Error at traj:  1006129


 68%|██████████████████████████████████████████████████████▎                         | 955/1408 [03:38<01:38,  4.61it/s]

 Error at traj:  1006176


100%|███████████████████████████████████████████████████████████████████████████████| 1408/1408 [05:45<00:00,  4.08it/s]


trajectories generalized. (1)


In [13]:
# Douglas-Peucker generalization for smoothed trajectories
print('generalizing trajectories... (2)')
logging.info('generalizing trajectories... (2)')
# geolife_splitted_smooth_generalized_collection = Parallel(n_jobs=4, verbose=10)(delayed(generalize_traj)(traj) for traj in geolife_splitted_smooth_collection.trajectories)
freemove_smooth_generalized_collection = []
for traj in tqdm(freemove_smooth_collection.trajectories):
    try:
        freemove_smooth_generalized_collection.append(generalize_traj(traj))
    except BaseException as e:
        print(e, 'Error at traj: ', traj.id)
        continue

freemove_smooth_generalized_collection = mpd.TrajectoryCollection(freemove_smooth_generalized_collection)
print('trajectories generalized. (2)')
logging.info('trajectories generalized. (2)')

generalizing trajectories... (2)


100%|███████████████████████████████████████████████████████████████████████████████| 1408/1408 [05:45<00:00,  4.07it/s]


trajectories generalized. (2)


In [20]:
def convert_trajcollection_to_gdf(trajcollection):
    gdfs = []
    for traj in tqdm(trajcollection.trajectories):
        traj_gdf = traj.to_traj_gdf()
        traj_gdf['user_id'] = traj.obj_id
        gdfs.append(traj_gdf)

    gdf = gp.GeoDataFrame(pd.concat(gdfs), crs='EPSG:3035')

    return gp.GeoDataFrame(pd.concat(gdfs), crs='EPSG:3035')



In [21]:
logging.info('converting to gdf...')
freemove_raw = convert_trajcollection_to_gdf(raw_full_trip_collection)
freemove_splitted = convert_trajcollection_to_gdf(freemove_splitted_collection)
freemove_smooth = convert_trajcollection_to_gdf(freemove_smooth_collection)
freemove_generalized = convert_trajcollection_to_gdf(freemove_generalized_collection)
freemove_smooth_generalized = convert_trajcollection_to_gdf(freemove_smooth_generalized_collection)
logging.info('converted to gdf.')

100%|███████████████████████████████████████████████████████████████████████████████| 1408/1408 [00:33<00:00, 41.97it/s]


In [22]:
# Write gdf to pickle file to load fast for further processing
logging.info('writing to geojson...')
write_geojson(freemove_raw, '../data/freemove/freemove_raw.geojson')
write_geojson(freemove_splitted, '../data/freemove/freemove_splitted.geojson')
write_geojson(freemove_smooth, '../data/freemove/freemove_smooth.geojson')
write_geojson(freemove_generalized, '../data/freemove/freemove_generalized.geojson')
write_geojson(freemove_smooth_generalized, '../data/freemove/freemove_smooth_generalized.geojson')

# Save point gdf as geojson
freemove_point_gdf = raw_full_trip_collection.to_point_gdf().reset_index()
write_geojson(freemove_point_gdf, '../data/freemove/freemove_raw_point.geojson')
freemove_smoothed_generalized_point = freemove_smooth_generalized_collection.to_point_gdf().reset_index()
write_geojson(freemove_smoothed_generalized_point, '../data/freemove/freemove_smooth_generalized_point.geojson')
logging.info('written to geojson.')


  pd.Int64Index,
  pd.Int64Index,
