In [1]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from tqdm import tqdm
tqdm.pandas()

  from pandas import Panel


In [2]:
raw_trajectory = pd.read_parquet('data/trajectory/sorted_trajectory.parquet')
raw_trajectory.loc[raw_trajectory['longitude'] < 11.5, 'longitude'] = raw_trajectory['longitude'] * 10
raw_trajectory.loc[raw_trajectory['latitude'] < 2.4, 'latitude'] = raw_trajectory['latitude'] * 10

In [3]:
od = dd.read_csv('data/01.出租车/出租车交易数据/TAXIMETERS_DEAL_2017_06_*', header=None, usecols=[0, 1, 2],
                 names=['id', 'begin_time', 'end_time'])
od['begin_time'] = dd.to_datetime(od['begin_time'], errors='coerce')
od['end_time'] = dd.to_datetime(od['end_time'], errors='coerce')
od = od.dropna().compute()

In [4]:
# 1. Add occupied_from_od flag
def search_ranges(range_record, trajectories, range_id='Licence', trajectory_id='plate',
                  begin_time='begin_time', end_time='end_time'):
    _id = range_record[range_id]
    _s_t, _e_t = range_record[begin_time], range_record[end_time]
    _id_s = trajectories[trajectory_id].searchsorted(_id, side='left')
    _id_e = trajectories[trajectory_id].searchsorted(_id, side='right')
    _temp = trajectories['timestamp'].iloc[_id_s: _id_e]
    _s_i = _id_s + _temp.searchsorted(_s_t, side='left')
    _e_i = _id_s + _temp.searchsorted(_e_t, side='right')
    return _s_i, _e_i

ranges = od.progress_apply(search_ranges, axis=1, args=(raw_trajectory,), range_id='id', trajectory_id='license',
                           begin_time='begin_time', end_time='end_time')
occupied_idx = np.concatenate([np.array(list(range(_a, _b))) for _a, _b in ranges]).astype(int)
raw_trajectory['occupied_from_od'] = False
raw_trajectory.iloc[occupied_idx, -1] = True
len(raw_trajectory.loc[raw_trajectory['occupied_from_od'] & ~raw_trajectory['occupied']].index
    ) / len(raw_trajectory.loc[raw_trajectory['occupied_from_od']].index)

100%|██████████| 21420023/21420023 [1:49:44<00:00, 3252.96it/s]


0.28791410091162895

In [5]:
# 2.3. Add dis2pre, dur2pre flag
def haversine_np(lon1, lat1, lon2, lat2, miles=False):
    AVG_EARTH_RADIUS = 6371.0088  # in km
    lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    d = np.sin(dlat * 0.5) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon * 0.5) ** 2
    h = 2 * AVG_EARTH_RADIUS * np.arcsin(np.sqrt(d))
    if miles:
        return h * 0.621371  # in miles
    else:
        return h  # in kilometers
raw_trajectory['dis2pre'] = haversine_np(raw_trajectory['longitude'].shift(), raw_trajectory['latitude'].shift(), 
                                         raw_trajectory['longitude'], raw_trajectory['latitude']).astype(np.float32)
raw_trajectory['dur2pre'] = (raw_trajectory['timestamp']
                             - raw_trajectory['timestamp'].shift()).dt.total_seconds().astype(np.float32)
raw_trajectory.loc[raw_trajectory['license'] != raw_trajectory['license'].shift(), ['dis2pre', 'dur2pre']] = None

In [6]:
# 4. Add big_dur flag
raw_trajectory['big_dur'] = raw_trajectory['dur2pre'] > 1800
# 5. Add valid flag
raw_trajectory['valid'] = ~(raw_trajectory['big_dur'] & (raw_trajectory['dis2pre'] > 0.5))
# 6. Add stop flag
raw_trajectory['stop'] = (((raw_trajectory['speed'] == 0) & ~raw_trajectory['big_dur']) |
                          (raw_trajectory['big_dur'] & raw_trajectory['valid']))

In [15]:
raw_trajectory.astype({'speed': np.int16, 'occupied': bool, 'longitude': np.float32, 'latitude': np.float32}
                      ).to_parquet('data/trajectory/statuses_wo_charging_resting')