In [1]:
import config
import os
import datetime
import random
import pandas as pd
import dask
import dask.dataframe as dd

import math
import geopandas as gpd
from shapely.geometry import Polygon
import scienceplots
import matplotlib.pyplot as plt
plt.style.use(['science'])


def gps_to_grid(lat, lon):
    """ Calculate grid index for a given GPS coordinate """
    lat_index = int((lat - (min_lat - delta_lat / 2)) / delta_lat)
    lon_index = int((lon - (min_lon - delta_lon / 2)) / delta_lon)
    return lat_index, lon_index

def grid_to_gps(lat_index, lon_index):
    """ Calculate GPS coordinates of the center point for a given grid index """
    lat = lat_index * delta_lat + min_lat
    lon = lon_index * delta_lon + min_lon
    return lat, lon

def unix_to_datetime(timestamp: str) -> datetime:
    """
    convert unix to datetime
    >>> dt = unix_to_datetime('1538402919')
    >>> print(dt.strftime('%Y/%m/%d %H:%M:%S'))
    2018/10/01 22:08:39
    """
    return datetime.datetime.fromtimestamp(int(timestamp))

dataset = config.chengdu_dataset

# Preparation: cleaned, selected, expended

## Duplicates

In [2]:
filepaths = [os.path.join(dataset, i) 
                 for i in os.listdir(dataset) 
                 if i.startswith('chengdu') and i.endswith('.csv')]

ddf_origin = dd.read_csv(filepaths, header=None, names=['oid', 'wid', 'traj'], blocksize=64e6)
ddf_origin.head()

Unnamed: 0,oid,wid,traj
0,92a1d44607c036fbc9db971258c0e013,f5e36c97e4a8052a2aa9c7f8ed0f969a,"[104.04538 30.70745 1538402919, 104.04538 30.7..."
1,0e95f18113ad01cf626289344fd39f4e,c5f1bba8b2cfad357a9ed65dc3021ff1,"[104.08169 30.6705 1538406898, 104.08169 30.67..."
2,7dceae818438b836e3d306296b4ccfbd,0e3f29616d126177f5db9bfb27be21de,"[104.04235 30.69204 1538334938, 104.04268 30.6..."
3,94bba68a63c866769c1c3b37834518fa,c480964f535eede621a1ac4dde7073a6,"[104.04714 30.7263 1538345323, 104.04698 30.72..."
4,8601cc60cfea2fc7f599ebf9b2a4d977,5f3cabc6ff9083e6c79f3f798a9c17e2,"[104.11542 30.65289 1538322576, 104.11574 30.6..."


In [3]:
output_cleaned = os.path.join(dataset, 'cleaned')

if not os.path.exists(output_cleaned) or len(os.listdir(output_cleaned)) == 0:
    os.makedirs(output_cleaned, exist_ok=True)
    ddf_origin = dd.read_csv(filepaths, header=None, names=['oid', 'wid', 'traj'])
    # remove duplicate record
    oid_counts = ddf_origin['oid'].value_counts().compute()
    duplicate_orders = (oid_counts[oid_counts > 1]).index.tolist()
    ddf_origin[~ddf_origin['oid'].isin(duplicate_orders)].to_parquet(output_cleaned)

cleaned_files = [os.path.join(output_cleaned, i) 
                 for i in os.listdir(output_cleaned) 
                 if i.startswith('part') and i.endswith('.parquet')]

ddf_cleaned = dd.read_parquet(cleaned_files)
ddf_cleaned.head()

Unnamed: 0,oid,wid,traj
0,92a1d44607c036fbc9db971258c0e013,f5e36c97e4a8052a2aa9c7f8ed0f969a,"[104.04538 30.70745 1538402919, 104.04538 30.7..."
1,0e95f18113ad01cf626289344fd39f4e,c5f1bba8b2cfad357a9ed65dc3021ff1,"[104.08169 30.6705 1538406898, 104.08169 30.67..."
2,7dceae818438b836e3d306296b4ccfbd,0e3f29616d126177f5db9bfb27be21de,"[104.04235 30.69204 1538334938, 104.04268 30.6..."
3,94bba68a63c866769c1c3b37834518fa,c480964f535eede621a1ac4dde7073a6,"[104.04714 30.7263 1538345323, 104.04698 30.72..."
4,8601cc60cfea2fc7f599ebf9b2a4d977,5f3cabc6ff9083e6c79f3f798a9c17e2,"[104.11542 30.65289 1538322576, 104.11574 30.6..."


In [4]:
if not os.path.exists(os.path.join(dataset, 'workers.txt')):
    workers = wid_counts[wid_counts > 30].index.tolist() 
    print(len(workers))
    wid_counts[wid_counts > 30].to_csv(os.path.join(dataset, 'workers.txt'))
    
workers = pd.read_csv(os.path.join(dataset, 'workers.txt'))
workers

Unnamed: 0,wid,count
0,248e83a8ef8b7af1815873f9d488fb2b,769
1,9158ae4e303282353602a08135ca505e,737
2,eefefbc7788b7c934c97a19a52845228,718
3,92152394842cba2d5d19f12ebf22d44f,718
4,ad23645b75259f69658a11a1de55abb0,679
...,...,...
85012,e48d949a7a90f17d06fb73c99cde9fd7,31
85013,2b11d43ba53434c69dfb1ee8d7cffe27,31
85014,1521f29cf5a547d3ef622a0d1776a2ab,31
85015,7ed98cdc1851b653be6ecbe14d3b58b3,31


In [5]:
output_selected = os.path.join(dataset, 'selected')

if not os.path.exists(output_selected) or len(os.listdir(output_selected)) == 0:
    os.makedirs(selected, exist_ok=True)
    ddf_cleaned = dd.read_parquet(output_cleaned)
    ddf_cleaned[ddf_cleaned['wid'].isin(workers['wid'])].to_parquet(output_selected)

selected_files = [os.path.join(output_selected, i) 
                 for i in os.listdir(output_selected) 
                 if i.startswith('part') and i.endswith('.parquet')]

ddf_selected = dd.read_parquet(selected_files)
ddf_selected.head()

Unnamed: 0,oid,wid,traj
1,0e95f18113ad01cf626289344fd39f4e,c5f1bba8b2cfad357a9ed65dc3021ff1,"[104.08169 30.6705 1538406898, 104.08169 30.67..."
2,7dceae818438b836e3d306296b4ccfbd,0e3f29616d126177f5db9bfb27be21de,"[104.04235 30.69204 1538334938, 104.04268 30.6..."
3,94bba68a63c866769c1c3b37834518fa,c480964f535eede621a1ac4dde7073a6,"[104.04714 30.7263 1538345323, 104.04698 30.72..."
4,8601cc60cfea2fc7f599ebf9b2a4d977,5f3cabc6ff9083e6c79f3f798a9c17e2,"[104.11542 30.65289 1538322576, 104.11574 30.6..."
5,bc4ed5a4efc9d75add3b4767efe5aabf,c6cad4068fb04e30cda89c75673ee363,"[104.07026 30.72456 1538322724, 104.07011 30.7..."


## Format

In [6]:
peek = ddf_selected.head()
peek

Unnamed: 0,oid,wid,traj
1,0e95f18113ad01cf626289344fd39f4e,c5f1bba8b2cfad357a9ed65dc3021ff1,"[104.08169 30.6705 1538406898, 104.08169 30.67..."
2,7dceae818438b836e3d306296b4ccfbd,0e3f29616d126177f5db9bfb27be21de,"[104.04235 30.69204 1538334938, 104.04268 30.6..."
3,94bba68a63c866769c1c3b37834518fa,c480964f535eede621a1ac4dde7073a6,"[104.04714 30.7263 1538345323, 104.04698 30.72..."
4,8601cc60cfea2fc7f599ebf9b2a4d977,5f3cabc6ff9083e6c79f3f798a9c17e2,"[104.11542 30.65289 1538322576, 104.11574 30.6..."
5,bc4ed5a4efc9d75add3b4767efe5aabf,c6cad4068fb04e30cda89c75673ee363,"[104.07026 30.72456 1538322724, 104.07011 30.7..."


In [None]:
meta = pd.DataFrame({'wid': pd.Series(dtype='object'),
                     'oid': pd.Series(dtype='object'),
                     'lon': pd.Series(dtype='float64'),
                     'lat': pd.Series(dtype='float64'),
                     'dt': pd.Series(dtype='datetime64[ns]')})

def apply_func(row):
    wid, oid = row['wid'], row['oid']
    points = row['traj'][1:-1].split(',')
    data = []
    for point in points:
        lon, lat, timestamp = point.strip().split(' ')
        lon, lat = map(float, (lon, lat))
        # lon_index = int((lon - (min_lon - delta_lon / 2)) / delta_lon)
        # lat_index = int((lat - (min_lat - delta_lat / 2)) / delta_lat)
        dt = unix_to_datetime(timestamp)
        data.append({'wid': wid, 'oid': oid, 'lon': lon, 'lat': lat, 'dt': dt})
    return pd.DataFrame(data)

def process_partition(partition):
    return pd.concat([apply_func(row) for index, row in partition.iterrows()])

output_expended = os.path.join(dataset, 'expended')
if not os.path.exists(output_expended) or len(os.listdir(output_expended)) == 0:
    ddf_selected.map_partitions(process_partition, meta=meta).to_parquet(output_expended)
    
expended_files = [os.path.join(output_expended, i) 
                 for i in os.listdir(output_expended) 
                 if i.startswith('part') and i.endswith('.parquet')]

ddf_expended = dd.read_parquet(expended_files)
ddf_expended.head(67)

## Grids

In [None]:
min_lat, max_lat, min_lon, max_lon = dd.compute(
    ddf_expended['lat'].min(),
    ddf_expended['lat'].max(),
    ddf_expended['lon'].min(),
    ddf_expended['lon'].max(),
)

print(f'min_lat={min_lat}, max_lat={max_lat}, min_lon={min_lon}, max_lon={max_lon}')

In [None]:
grid_size = config.grid_accuracy
earth_radius = config.earth_radius
delta_lat = grid_size * 360 / (2 * math.pi * earth_radius)
delta_lon = grid_size * 360 / (2 * math.pi * earth_radius * math.cos((min_lat + max_lat) * math.pi / 360))

output_grid = os.path.join(dataset, 'grid.shp')

# Create a list to hold grid data
grid_data_list = []
# Calculate the number of grids to generate
num_lon_grids = int((max_lon - min_lon) / delta_lon) + 1
num_lat_grids = int((max_lat - min_lat) / delta_lat) + 1
# Generate grids and populate the dataframe
for i in range(num_lon_grids):
    center_lon = i * delta_lon + min_lon
    for j in range(num_lat_grids):
        center_lat = j * delta_lat + min_lat
        grid_polygon = Polygon([
            (center_lon - delta_lon / 2, center_lat - delta_lat / 2),
            (center_lon + delta_lon / 2, center_lat - delta_lat / 2),
            (center_lon + delta_lon / 2, center_lat + delta_lat / 2),
            (center_lon - delta_lon / 2, center_lat + delta_lat / 2)
        ])
        grid_data_list.append({
            'lon_index': i, 
            'lat_index': j,
            'lon_center': center_lon, 
            'lat_center': center_lat,
            'geometry': grid_polygon
        })
# Convert the list to a GeoDataFrame
grid_data = gpd.GeoDataFrame(grid_data_list, columns=['lon_index', 'lat_index', 'lon_center', 'lat_center', 'geometry'])
grid_data.set_geometry('geometry', inplace=True)
# Plot the grid data
grid_data.plot(edgecolor='black', linewidth=0.3, figsize=(20, 15))
grid_data.to_file(os.path.join(dataset, 'grid.shp'))
    
gdf_grid = gpd.read_file(output_grid)
gdf_grid

# Look back

In [None]:
ddf_cleaned.head()

In [None]:
ddf_expended.head()

In [None]:
output_tasks = os.path.join(dataset, 'tasks.txt')
tasks = ddf_expended[['lon_index', 'lat_index']].drop_duplicates().compute()
tasks

In [None]:
tasks.to_csv()

In [None]:
gdf_grid_filtered = gdf_grid.merge(tasks[['lon_index', 'lat_index']], 
                                     on=['lon_index', 'lat_index'], 
                                     how='inner')
gdf_grid_filtered.plot(edgecolor='black', linewidth=0.3, figsize=(20, 15))

In [None]:
gdf_grid.plot(edgecolor='black', linewidth=0.3, figsize=(20, 15))

# Scratch: ArrivePro & Qualific

对一个特定的工人、任务：

Arrivepro：有记录天数除以总天数计算，代表到达当前任务的概率；

qPro：质量达标（停留秒数大于阈值）天数除以有记录天数，代表完成该任务的概率。

In [None]:
ddf_expended['']

In [6]:
n, m = 1140, 565