In [11]:
import time

import polars as pl
import glob
from tqdm.autonotebook import tqdm

queries = []
for file in tqdm(glob.glob("/home/liontao/work/data/tdrive/taxi_log_2008_by_id/*.txt")):
    try:
        q = pl.read_csv(file,has_header=False,use_pyarrow=True,new_columns=["id","ts","lon","lat"])
        queries.append(q)
    except:
        pass

df_vertical_concat = pl.concat(
    queries,
    how="vertical",
)
print(df_vertical_concat)

  0%|          | 0/10357 [00:00<?, ?it/s]

shape: (17_662_984, 4)
┌──────┬─────────────────────┬───────────┬──────────┐
│ id   ┆ ts                  ┆ lon       ┆ lat      │
│ ---  ┆ ---                 ┆ ---       ┆ ---      │
│ i64  ┆ datetime[ms]        ┆ f64       ┆ f64      │
╞══════╪═════════════════════╪═══════════╪══════════╡
│ 1    ┆ 2008-02-02 15:36:08 ┆ 116.51172 ┆ 39.92123 │
│ 1    ┆ 2008-02-02 15:46:08 ┆ 116.51135 ┆ 39.93883 │
│ 1    ┆ 2008-02-02 15:46:08 ┆ 116.51135 ┆ 39.93883 │
│ 1    ┆ 2008-02-02 15:56:08 ┆ 116.51627 ┆ 39.91034 │
│ …    ┆ …                   ┆ …         ┆ …        │
│ 9999 ┆ 2008-02-08 17:16:15 ┆ 116.28298 ┆ 39.9974  │
│ 9999 ┆ 2008-02-08 17:21:17 ┆ 116.28896 ┆ 39.99235 │
│ 9999 ┆ 2008-02-08 17:26:19 ┆ 116.28925 ┆ 39.98273 │
│ 9999 ┆ 2008-02-08 17:36:23 ┆ 116.26768 ┆ 39.90663 │
└──────┴─────────────────────┴───────────┴──────────┘


In [24]:
df_vertical_concat.write_parquet("tdrive.parquet",use_pyarrow=True)

In [22]:
tids = df_vertical_concat["id"].unique()
ids=[]
points=[]
size=[]
for id in tqdm(tids):
    ids.append(id)
    single_traj = df_vertical_concat.filter(pl.col("id")==id).sort("ts")
    points.append(";".join([",".join(map(str,i)) for i in zip(single_traj["lon"],(single_traj["lat"]))]))
    size.append(len(single_traj))
dita_data = pl.DataFrame({"id":ids,"points":points,"size":size})
print(dita_data)

  0%|          | 0/10336 [00:00<?, ?it/s]

shape: (10_336, 3)
┌───────┬───────────────────────────────────┬──────┐
│ id    ┆ points                            ┆ size │
│ ---   ┆ ---                               ┆ ---  │
│ i64   ┆ str                               ┆ i64  │
╞═══════╪═══════════════════════════════════╪══════╡
│ 1     ┆ 116.51172,39.92123;116.51135,39.… ┆ 588  │
│ 2     ┆ 116.36422,39.88781;116.37481,39.… ┆ 1674 │
│ 3     ┆ 116.35743,39.88957;116.35732,39.… ┆ 1371 │
│ 4     ┆ 116.47002,39.90666;116.44422,39.… ┆ 672  │
│ …     ┆ …                                 ┆ …    │
│ 10354 ┆ 116.40088,39.9877;116.4016,39.99… ┆ 1604 │
│ 10355 ┆ 116.39249,39.8817;116.39268,39.8… ┆ 379  │
│ 10356 ┆ 116.32789,39.98514;116.31804,39.… ┆ 526  │
│ 10357 ┆ 116.45186,39.93225;116.46777,39.… ┆ 734  │
└───────┴───────────────────────────────────┴──────┘


In [42]:
dita_data.write_parquet("tdrive-dida.parquet",use_pyarrow=True)


590

In [31]:
with open("dita-all.txt","w") as f:
    f.writelines([ i+'\n' for i in dita_data.sort("id").select(pl.col("points")).to_series().to_list() ])

In [51]:
candidates_id=dita_data.filter(pl.col("size")>3010)["id"].to_list()[:500]
with open("tdrive-dita-long.txt","w") as f:
    f.writelines([ i+'\n' for i in dita_data.filter(pl.col("id").is_in(candidates_id)).sort("id").select(pl.col("points")).to_series().to_list() ])

In [63]:
df_vertical_concat.filter(pl.col("id").is_in(candidates_id).and_(pl.col("lon")>30.0).and_(pl.col("lat")>30.0)).min()

id,ts,lon,lat
i64,datetime[ms],f64,f64
10,2008-02-02 13:30:44,101.60205,30.00274


# Geoflink Data

In [99]:
import polars as pl
import time
windows=5
step=8
size=(windows+4*step)*4 # 4points/min
with open("tdrive-dita-long.txt") as f:
    lines = f.readlines()
start = int(time.time())*1000
long_trajs=[]
for idx,traj in enumerate(lines[:100]):
    id = []
    ts = []
    lon=[]
    lat=[]
    for step,p in enumerate(traj.strip().split(";")):
        id.append(idx)
        ts.append(start+15000*step)
        lon.append(float(p.split(",")[0]))
        lat.append(float(p.split(",")[1]))
    long_trajs.append(pl.DataFrame({"id":id[:size],"ts":ts[:size],"lon":lon[:size],"lat":lat[:size]}))
geoflink_data = pl.concat(
    long_trajs,
    how="vertical",
).sort("ts")
print(geoflink_data)

shape: (14_800, 4)
┌─────┬───────────────┬───────────┬──────────┐
│ id  ┆ ts            ┆ lon       ┆ lat      │
│ --- ┆ ---           ┆ ---       ┆ ---      │
│ i64 ┆ i64           ┆ f64       ┆ f64      │
╞═════╪═══════════════╪═══════════╪══════════╡
│ 0   ┆ 1682856577000 ┆ 116.44457 ┆ 39.92157 │
│ 1   ┆ 1682856577000 ┆ 116.47983 ┆ 39.97358 │
│ 2   ┆ 1682856577000 ┆ 116.40818 ┆ 40.0012  │
│ 3   ┆ 1682856577000 ┆ 116.43448 ┆ 39.94692 │
│ …   ┆ …             ┆ …         ┆ …        │
│ 96  ┆ 1682858782000 ┆ 116.3291  ┆ 39.95485 │
│ 97  ┆ 1682858782000 ┆ 116.35037 ┆ 39.89793 │
│ 98  ┆ 1682858782000 ┆ 116.45937 ┆ 39.9766  │
│ 99  ┆ 1682858782000 ┆ 116.46703 ┆ 39.99748 │
└─────┴───────────────┴───────────┴──────────┘


In [95]:
geoflink_data.write_csv("tdrive-geoflink-long.txt",has_header=False)

# DITA pseudo Stream

In [73]:
# take first 100 traj
with open("tdrive-dita-long.txt") as f:
    lines = f.readlines()
with open("tdrive-dita-long-small.txt",'w') as f:
    f.writelines(lines[:100])

# scale

In [102]:
import polars as pl
import time
windows=5
step=1
size=(windows+4*step)*4 # 4points/min
with open("tdrive-dita-long-small.txt") as f:
    lines = f.readlines()
start = int(time.time())*1000
long_trajs=[]
for idx,traj in enumerate(lines):
    id = []
    ts = []
    lon=[]
    lat=[]
    for step,p in enumerate(traj.strip().split(";")):
        id.append(idx)
        ts.append(start+15000*step)
        lon.append(float(p.split(",")[0]))
        lat.append(float(p.split(",")[1]))
    long_trajs.append(pl.DataFrame({"id":id[:size],"ts":ts[:size],"lon":lon[:size],"lat":lat[:size]}))
geoflink_data = pl.concat(
    long_trajs,
    how="vertical",
).sort("ts")
print(geoflink_data)

shape: (5_436, 4)
┌─────┬───────────────┬───────────┬──────────┐
│ id  ┆ ts            ┆ lon       ┆ lat      │
│ --- ┆ ---           ┆ ---       ┆ ---      │
│ i64 ┆ i64           ┆ f64       ┆ f64      │
╞═════╪═══════════════╪═══════════╪══════════╡
│ 0   ┆ 1682857440000 ┆ 116.44457 ┆ 39.92157 │
│ 1   ┆ 1682857440000 ┆ 116.47983 ┆ 39.97358 │
│ 2   ┆ 1682857440000 ┆ 116.40818 ┆ 40.0012  │
│ 3   ┆ 1682857440000 ┆ 116.43448 ┆ 39.94692 │
│ …   ┆ …             ┆ …         ┆ …        │
│ 147 ┆ 1682857965000 ┆ 116.363   ┆ 39.84751 │
│ 148 ┆ 1682857965000 ┆ 116.39046 ┆ 39.86599 │
│ 149 ┆ 1682857965000 ┆ 116.31125 ┆ 39.97451 │
│ 150 ┆ 1682857965000 ┆ 116.2884  ┆ 39.88844 │
└─────┴───────────────┴───────────┴──────────┘


In [103]:
geoflink_data.write_csv("tdrive-geoflink-long-small.txt",has_header=False)