In [422]:
import duckdb
conn=duckdb.connect()
conn.execute("SET timezone='UTC'")
df=conn.query("""
select maid,timestamp,latitude,longitude,flux from read_parquet('./data/raw_rt/2025-08-05/*.parquet')
""").df()

In [420]:
import pygeohash as pgh
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
from tqdm import tqdm

# Helper function for parallel geohash encoding
def encode_geohash_batch(args):
    """Encode a batch of lat/lon to geohash with given precision.
    args is a tuple: (lat_list, lon_list, precision)
    """
    lat_list, lon_list, precision = args
    return [pgh.encode(latitude=lat, longitude=lon, precision=precision) for lat, lon in zip(lat_list, lon_list)]
def geohash_encode(df,gh_batch_size=500000,gh_precision=7,gh_chunksize=10):
    cpu_count = mp.cpu_count()
    gh_workers = max(1, min(cpu_count, 96))
    print("Encoding geohashes in parallel...")
    total_rows = len(df)
    geohashes = []
    # Prepare batch tasks
    batch_args = []
    for i in range(0, total_rows, gh_batch_size):
        batch_end = min(i + gh_batch_size, total_rows)
        print(f"Geohash batch {i//gh_batch_size + 1}/{(total_rows-1)//gh_batch_size + 1} ({batch_end - i} rows)")
        # Slice as Python lists for cheaper pickling
        batch_lat = df['latitude'].iloc[i:batch_end].tolist()
        batch_lon = df['longitude'].iloc[i:batch_end].tolist()
        batch_args.append((batch_lat, batch_lon, gh_precision))

    if len(batch_args) == 1:
        # Single batch: avoid process pool overhead
        geohashes = encode_geohash_batch(batch_args[0])
    else:
        with ProcessPoolExecutor(max_workers=gh_workers) as gh_executor:
            for result in tqdm(
                gh_executor.map(encode_geohash_batch, batch_args, chunksize=gh_chunksize),
                total=len(batch_args),
                desc=f"Geohash encoding"
            ):
                geohashes.extend(result)

    # Add geohashes to dataframe
    df['geohash'] = geohashes
    return df

In [423]:
df=geohash_encode(df,gh_precision=12)

Encoding geohashes in parallel...
Geohash batch 1/11 (500000 rows)
Geohash batch 2/11 (500000 rows)
Geohash batch 3/11 (500000 rows)
Geohash batch 4/11 (500000 rows)
Geohash batch 5/11 (500000 rows)
Geohash batch 6/11 (500000 rows)
Geohash batch 7/11 (500000 rows)
Geohash batch 8/11 (500000 rows)
Geohash batch 9/11 (500000 rows)
Geohash batch 10/11 (500000 rows)
Geohash batch 11/11 (429854 rows)


Geohash encoding: 100%|██████████| 11/11 [00:08<00:00,  1.30it/s]


In [614]:
cc=df.groupby('maid').size().sort_values(ascending=False).to_frame().reset_index()



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



In [642]:
import pandas as pd
id=54321
sample_maid=df[df.maid==cc.maid.iloc[id]]
sample_maid.loc[:, 'date'] = sample_maid.timestamp.dt.date.astype(str)
sample_maid.loc[:, 'hour'] = sample_maid.timestamp.dt.hour.astype(int)
dk=sample_maid.groupby('geohash')['geohash'].transform('count') > 1
duplicates=sample_maid.loc[dk]
notduplicates=sample_maid.loc[~dk]

grouped_dup=sample_maid.groupby(['date','hour']).agg(
    std_gh=('geohash',lambda x: pgh.std(x.tolist())),
    geohash=('geohash',lambda x: pgh.mean(x.tolist())),
    count=('geohash',lambda x: x.nunique()),
    lat=('latitude',lambda x: x.mean()),
    lon=('longitude',lambda x: x.mean())
).reset_index()

import plotly.graph_objects as go

# Create figure
fig = go.Figure()

# Add pingsink points (default blue)
pingsink_data = grouped_dup.loc[(grouped_dup['count']>20) & (grouped_dup['std_gh']<5)]
fig.add_trace(go.Scattermapbox(
    lat=pingsink_data['lat'],
    lon=pingsink_data['lon'],
    mode='markers',
    marker=dict(size=8),
    name='cluster',
    hovertemplate='<br>'.join([
        'geohash: %{customdata[0]}',
        'count: %{customdata[1]}',
        'std_gh: %{customdata[2]}',
        '<extra></extra>'
    ]),
    customdata=pingsink_data[['geohash','count','std_gh']].values
))

# Add normal points (red)
normal_data = sample_maid.drop_duplicates(subset=['geohash'])
fig.add_trace(go.Scattermapbox(
    lat=normal_data['latitude'],
    lon=normal_data['longitude'],
    mode='markers',
    marker=dict(size=8, color='red'),
    name='path',
    hovertemplate='<br>'.join([
        'geohash: %{customdata[0]}',
        'timestamp: %{customdata[1]}',
        '<extra></extra>'
    ]),
    customdata=normal_data[['geohash', 'timestamp']].values
))
fig.update_layout(
    mapbox_style="open-street-map",
    mapbox=dict(
        center=dict(
            lat=grouped_dup['lat'].mean(),
            lon=grouped_dup['lon'].mean()
        ),
        zoom=12
    ),
    height=600
)
fig.show()
pingsink_data



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


*scattermapbox* is deprecated! Use *scattermap* instead. Learn more at: https://plotly.com/python/mapbox-to-maplibre/


*scattermapbox* is deprecated! Use *scattermap* instead. Learn more at: https://plotly.com/python/mapbox-to-maplibre/



Unnamed: 0,date,hour,std_gh,geohash,count,lat,lon
