# Clusterring Tipping Pattern in New York Taxi Dataset

Dataset that will be used here are the same as data for regression. But since clustering is unsupervised learning the data won't be separated as target. It can be feeded as a whole. Because K means is sensitive with outlier, scaling and outlier remover are added in the pipeline.

## 1.1 Clustering Tipping Pattern on Yellow and Green Taxi

In [2]:
import duckdb

In [2]:
query_yellow_green = """
WITH CTE_yellow_2009 AS (
    SELECT 
        CAST(Trip_Pickup_DateTime AS TIMESTAMP) AS pick_up_time,
        CAST(Trip_Dropoff_DateTime AS TIMESTAMP) AS drop_off_time,
        CAST(Passenger_Count AS INTEGER) AS passenger_count,
        CAST(Trip_Distance AS INTEGER) AS trip_distance,
        Payment_Type AS payment_type,
        CAST(Total_Amt AS FLOAT)   AS total_amount,
        Tip_Amt AS tip_amount,
        CAST(Tip_Amt AS FLOAT)   AS tip_amount,
        CAST( CASE Payment_Type
            WHEN  'Credit' THEN 1
            WHEN  'CREDIT' THEN 1
            ELSE 2
        END AS INTEGER) AS payment_category
    FROM 'C:/Users/ekadw/Documents/DATA/NY_Taxi/2009/yellow_taxi_2009/yellow_tripdata_*.parquet'
    WHERE Trip_Pickup_DateTime IS NOT NULL
        AND Trip_Dropoff_DateTime IS NOT NULL
        AND Passenger_Count >= 0
        AND Trip_Distance >= 0 
        AND Trip_Distance <= 50
        AND Payment_Type IS NOT NULL
        AND Total_Amt >= 0
        AND Tip_Amt >= 0
        AND Trip_Pickup_DateTime >= '2009-01-01' 
        AND Trip_Pickup_DateTime < '2010-01-01'
), CTE_duration_yellow_2009 AS (
    SELECT
        pick_up_time,
        drop_off_time,
        passenger_count,
        trip_distance,
        total_amount,
        payment_category,
        tip_amount,
        DATE_DIFF('day', pick_up_time, drop_off_time) AS duration_days,
        EPOCH(drop_off_time - pick_up_time) AS duration_seconds
    FROM CTE_yellow_2009
    WHERE payment_category = 1
), CTE_yellow_2010 AS (
    SELECT 
        CAST(pickup_datetime AS TIMESTAMP) AS pick_up_time,
        CAST(dropoff_datetime AS TIMESTAMP) AS drop_off_time,
        CAST(passenger_count AS INTEGER) AS passenger_count,
        CAST(total_amount AS FLOAT)   AS total_amount,
        CAST(trip_distance AS FLOAT)   AS trip_distance,
        payment_type,
        tip_amount,
        CAST( CASE payment_type
            WHEN  'Cre' THEN 1
            WHEN  'CRE' THEN 1
            ELSE 2
        END AS INTEGER) AS payment_category
    FROM 'C:/Users/ekadw/Documents/DATA/NY_Taxi/2010/yellow_taxi_2010/yellow_tripdata_*.parquet'
    WHERE pickup_datetime IS NOT NULL
        AND dropoff_datetime  IS NOT NULL
        AND passenger_count >= 0
        AND trip_distance >= 0
        AND trip_distance <= 50
        AND payment_type IS NOT NULL
        AND total_amount >= 0
        AND tip_amount >= 0 
        AND pickup_datetime >= '2010-01-01' 
        AND pickup_datetime < '2011-01-01'
), CTE_duration_yellow_2010 AS (
    SELECT
        pick_up_time,
        drop_off_time,
        passenger_count,
        trip_distance,
        total_amount,
        payment_category,
        tip_amount,
        DATE_DIFF('day', pick_up_time, drop_off_time) AS duration_days,
        EPOCH(drop_off_time - pick_up_time) AS duration_seconds
    FROM CTE_yellow_2010
    WHERE payment_category = 1
), CTE_yellow_2011_2023 AS (
    SELECT 
        tpep_pickup_datetime AS pick_up_time,
        tpep_dropoff_datetime AS drop_off_time,
        CAST(passenger_count AS INTEGER) AS passenger_count,
        CAST(total_amount AS FLOAT)   AS total_amount,
        CAST(trip_distance AS FLOAT)   AS trip_distance,
        CAST(payment_type AS INTEGER) AS payment_category,
        tip_amount
    FROM 'C:/Users/ekadw/Documents/DATA/NY_Taxi/*/yellow_taxi/yellow_tripdata_*.parquet'
    WHERE tpep_pickup_datetime IS NOT NULL
        AND tpep_dropoff_datetime  IS NOT NULL
        AND passenger_count IS NOT NULL
        AND trip_distance >= 0
        AND trip_distance <= 50
        AND payment_type IS NOT NULL
        AND total_amount >= 0
        AND tip_amount >= 0
        AND tpep_pickup_datetime >= '2011-01-01' 
        AND tpep_pickup_datetime < '2023-10-01'
), CTE_duration_yellow_2011_2023 AS (
    SELECT
        pick_up_time,
        drop_off_time,
        passenger_count,
        trip_distance,
        total_amount,
        payment_category,
        tip_amount,
        DATE_DIFF('day', pick_up_time, drop_off_time) AS duration_days,
        EPOCH(drop_off_time - pick_up_time) AS duration_seconds
    FROM CTE_yellow_2011_2023
    WHERE payment_category = 1
), CTE_green_2011_2023 AS (
    SELECT 
        lpep_pickup_datetime AS pick_up_time,
        lpep_dropoff_datetime AS drop_off_time,
        CAST(passenger_count AS INTEGER) AS passenger_count,
        CAST(total_amount AS FLOAT)   AS total_amount,
        CAST(trip_distance AS FLOAT)   AS trip_distance,
        CAST(payment_type AS INTEGER) AS payment_category,
        tip_amount
    FROM 'C:/Users/ekadw/Documents/DATA/NY_Taxi/*/green_taxi/green_tripdata_*.parquet'
    WHERE lpep_pickup_datetime IS NOT NULL
        AND lpep_dropoff_datetime  IS NOT NULL
        AND passenger_count IS NOT NULL
        AND trip_distance >= 0
        AND trip_distance <= 50
        AND payment_type IS NOT NULL
        AND total_amount >= 0
        AND tip_amount >= 0
        AND lpep_pickup_datetime >= '2009-01-01' 
        AND lpep_pickup_datetime < '2023-10-01'
), CTE_duration_green_2011_2023 AS (
    SELECT
        pick_up_time,
        drop_off_time,
        passenger_count,
        trip_distance,
        total_amount,
        payment_category,
        tip_amount,
        DATE_DIFF('day', pick_up_time, drop_off_time) AS duration_days,
        CAST(EPOCH(drop_off_time - pick_up_time) AS FLOAT) AS duration_seconds
    FROM CTE_green_2011_2023
    WHERE payment_category = 1
), CTE_union_all AS (
    SELECT * FROM CTE_duration_yellow_2009
    UNION ALL
    SELECT * FROM CTE_duration_yellow_2010
    UNION ALL
    SELECT * FROM CTE_duration_yellow_2011_2023
    UNION ALL
    SELECT * FROM CTE_duration_green_2011_2023
)

SELECT 
    passenger_count,
    trip_distance,
    total_amount,
    duration_seconds,
    tip_amount
FROM CTE_union_all
WHERE (duration_days = 0) AND (duration_seconds > 0)
LIMIT 5
"""

con = duckdb.connect()
df_yellow_green = con.execute(query_yellow_green).fetchdf()
df_yellow_green.head()

Unnamed: 0,passenger_count,trip_distance,total_amount,duration_seconds,tip_amount
0,3,5.0,14.6,420.0,2.0
1,5,10.0,28.440001,840.0,4.74
2,1,5.0,18.450001,1262.0,3.05
3,1,0.0,6.7,585.0,1.0
4,1,2.0,10.0,679.0,1.3


In [9]:
query_yellow_green = """
WITH CTE_yellow_2009 AS (
    SELECT 
        CAST(Trip_Pickup_DateTime AS TIMESTAMP) AS pick_up_time,
        CAST(Trip_Dropoff_DateTime AS TIMESTAMP) AS drop_off_time,
        CAST(Passenger_Count AS INTEGER) AS passenger_count,
        CAST(Trip_Distance AS INTEGER) AS trip_distance,
        Payment_Type AS payment_type,
        CAST(Total_Amt AS FLOAT)   AS total_amount,
        Tip_Amt AS tip_amount,
        CAST(Tip_Amt AS FLOAT)   AS tip_amount,
        CAST( CASE Payment_Type
            WHEN  'Credit' THEN 1
            WHEN  'CREDIT' THEN 1
            ELSE 2
        END AS INTEGER) AS payment_category
    FROM 'C:/Users/ekadw/Documents/DATA/NY_Taxi/2009/yellow_taxi_2009/yellow_tripdata_*.parquet'
    WHERE Trip_Pickup_DateTime IS NOT NULL
        AND Trip_Dropoff_DateTime IS NOT NULL
        AND Passenger_Count >= 0
        AND Trip_Distance >= 0 
        AND Trip_Distance <= 50
        AND Payment_Type IS NOT NULL
        AND Total_Amt >= 0
        AND Tip_Amt >= 0
        AND Trip_Pickup_DateTime >= '2009-01-01' 
        AND Trip_Pickup_DateTime < '2010-01-01'
), CTE_duration_yellow_2009 AS (
    SELECT
        pick_up_time,
        drop_off_time,
        passenger_count,
        trip_distance,
        total_amount,
        payment_category,
        tip_amount,
        DATE_DIFF('day', pick_up_time, drop_off_time) AS duration_days,
        EPOCH(drop_off_time - pick_up_time) AS duration_seconds
    FROM CTE_yellow_2009
    WHERE payment_category = 1
), CTE_yellow_2010 AS (
    SELECT 
        CAST(pickup_datetime AS TIMESTAMP) AS pick_up_time,
        CAST(dropoff_datetime AS TIMESTAMP) AS drop_off_time,
        CAST(passenger_count AS INTEGER) AS passenger_count,
        CAST(total_amount AS FLOAT)   AS total_amount,
        CAST(trip_distance AS FLOAT)   AS trip_distance,
        payment_type,
        tip_amount,
        CAST( CASE payment_type
            WHEN  'Cre' THEN 1
            WHEN  'CRE' THEN 1
            ELSE 2
        END AS INTEGER) AS payment_category
    FROM 'C:/Users/ekadw/Documents/DATA/NY_Taxi/2010/yellow_taxi_2010/yellow_tripdata_*.parquet'
    WHERE pickup_datetime IS NOT NULL
        AND dropoff_datetime  IS NOT NULL
        AND passenger_count >= 0
        AND trip_distance >= 0
        AND trip_distance <= 50
        AND payment_type IS NOT NULL
        AND total_amount >= 0
        AND tip_amount >= 0 
        AND pickup_datetime >= '2010-01-01' 
        AND pickup_datetime < '2011-01-01'
), CTE_duration_yellow_2010 AS (
    SELECT
        pick_up_time,
        drop_off_time,
        passenger_count,
        trip_distance,
        total_amount,
        payment_category,
        tip_amount,
        DATE_DIFF('day', pick_up_time, drop_off_time) AS duration_days,
        EPOCH(drop_off_time - pick_up_time) AS duration_seconds
    FROM CTE_yellow_2010
    WHERE payment_category = 1
), CTE_yellow_2011_2023 AS (
    SELECT 
        tpep_pickup_datetime AS pick_up_time,
        tpep_dropoff_datetime AS drop_off_time,
        CAST(passenger_count AS INTEGER) AS passenger_count,
        CAST(total_amount AS FLOAT)   AS total_amount,
        CAST(trip_distance AS FLOAT)   AS trip_distance,
        CAST(payment_type AS INTEGER) AS payment_category,
        tip_amount
    FROM 'C:/Users/ekadw/Documents/DATA/NY_Taxi/*/yellow_taxi/yellow_tripdata_*.parquet'
    WHERE tpep_pickup_datetime IS NOT NULL
        AND tpep_dropoff_datetime  IS NOT NULL
        AND passenger_count IS NOT NULL
        AND trip_distance >= 0
        AND trip_distance <= 50
        AND payment_type IS NOT NULL
        AND total_amount >= 0
        AND tip_amount >= 0
        AND tpep_pickup_datetime >= '2011-01-01' 
        AND tpep_pickup_datetime < '2023-10-01'
), CTE_duration_yellow_2011_2023 AS (
    SELECT
        pick_up_time,
        drop_off_time,
        passenger_count,
        trip_distance,
        total_amount,
        payment_category,
        tip_amount,
        DATE_DIFF('day', pick_up_time, drop_off_time) AS duration_days,
        EPOCH(drop_off_time - pick_up_time) AS duration_seconds
    FROM CTE_yellow_2011_2023
    WHERE payment_category = 1
), CTE_green_2011_2023 AS (
    SELECT 
        lpep_pickup_datetime AS pick_up_time,
        lpep_dropoff_datetime AS drop_off_time,
        CAST(passenger_count AS INTEGER) AS passenger_count,
        CAST(total_amount AS FLOAT)   AS total_amount,
        CAST(trip_distance AS FLOAT)   AS trip_distance,
        CAST(payment_type AS INTEGER) AS payment_category,
        tip_amount
    FROM 'C:/Users/ekadw/Documents/DATA/NY_Taxi/*/green_taxi/green_tripdata_*.parquet'
    WHERE lpep_pickup_datetime IS NOT NULL
        AND lpep_dropoff_datetime  IS NOT NULL
        AND passenger_count IS NOT NULL
        AND trip_distance >= 0
        AND trip_distance <= 50
        AND payment_type IS NOT NULL
        AND total_amount >= 0
        AND tip_amount >= 0
        AND lpep_pickup_datetime >= '2009-01-01' 
        AND lpep_pickup_datetime < '2023-10-01'
), CTE_duration_green_2011_2023 AS (
    SELECT
        pick_up_time,
        drop_off_time,
        passenger_count,
        trip_distance,
        total_amount,
        payment_category,
        tip_amount,
        DATE_DIFF('day', pick_up_time, drop_off_time) AS duration_days,
        CAST(EPOCH(drop_off_time - pick_up_time) AS FLOAT) AS duration_seconds
    FROM CTE_green_2011_2023
    WHERE payment_category = 1
), CTE_union_all AS (
    SELECT * FROM CTE_duration_yellow_2009
    UNION ALL
    SELECT * FROM CTE_duration_yellow_2010
    UNION ALL
    SELECT * FROM CTE_duration_yellow_2011_2023
    UNION ALL
    SELECT * FROM CTE_duration_green_2011_2023
)

SELECT 
    passenger_count,
    trip_distance,
    total_amount,
    duration_seconds,
    tip_amount
FROM CTE_union_all
WHERE (duration_days = 0) AND (duration_seconds > 0)
LIMIT 50000000
"""

#con = duckdb.connect()
#df_yellow_green = con.execute(query_yellow_green).fetchdf()
#df_yellow_green.head()

In [3]:
import pandas as pd
import numpy as np
import duckdb
from river import preprocessing, cluster
from sklearn.metrics import silhouette_score

# ----------------------------
# 1. Connect to DuckDB and query
# ----------------------------
con = duckdb.connect("my_data.duckdb")
res = con.execute(query_yellow_green)

# ----------------------------
# 2. Build KMeans pipeline
# ----------------------------
pipeline = preprocessing.StandardScaler() | cluster.KMeans(
    n_clusters=5,     # choose initial K
    halflife=0.4,     # controls how fast old info decays
    sigma=3,          # soft-assignment parameter
    seed=42
)

# ----------------------------
# 3. Streaming loop
# ----------------------------

# Buffer for evaluating clustering quality
buffer_size = 5000
buffer = []

while True:
    chunk = res.fetch_df_chunk(vectors_per_chunk=50_000)
    if chunk is None or len(chunk) == 0:
        break

    # Convert to dicts for River
    records = chunk.to_dict(orient="records")

    for r in records:
        # Use all features (drop target if supervised task existed)
        x = r  

        # Learn incrementally
        pipeline.learn_one(x)

        # Keep buffer for evaluation
        buffer.append(list(x.values()))
        if len(buffer) > buffer_size:
            buffer.pop(0)

    # ----------------------------
    # Evaluate clustering (optional)
    # ----------------------------
    if len(buffer) >= 50:  # need at least 2 clusters
        labels = [pipeline.predict_one({k: v for k, v in zip(chunk.columns, row)}) 
                  for row in buffer]
        try:
            sil = silhouette_score(buffer, labels)
            print(f"Silhouette score (approx): {sil:.4f}")
        except Exception:
            pass

# Final cluster centers
print("Cluster centers:", pipeline["KMeans"].centers)


Cluster centers: {0: defaultdict(..., {'total_amount': -0.6141234912723817, 'passenger_count': -0.6473863814617155, 'tip_amount': -0.5903120044434326, 'trip_distance': -0.47490581814410254, 'duration_seconds': -0.020836848810180164}), 1: defaultdict(..., {'total_amount': -4.492060243022872, 'passenger_count': 0.9969550322031457, 'tip_amount': -0.8020124354915046, 'trip_distance': -0.650876052435585, 'duration_seconds': 0.34765436010256523}), 2: defaultdict(..., {'total_amount': 0.6968932107201626, 'passenger_count': 3.490676059797429, 'tip_amount': 1.9699095203960066, 'trip_distance': 0.33152153233149584, 'duration_seconds': -2.2149648070344616}), 3: defaultdict(..., {'total_amount': -3.0439871024631513, 'passenger_count': 0.7390265856336059, 'tip_amount': 3.9332424816121447, 'trip_distance': 0.12497059171015168, 'duration_seconds': -0.3189698813123528}), 4: defaultdict(..., {'total_amount': 1.5953286612026076, 'passenger_count': -4.360635894026034, 'tip_amount': -0.9368319514336794, '

In [10]:
import pandas as pd
import numpy as np
import duckdb
from river import preprocessing, cluster
from sklearn.metrics import silhouette_score

# ----------------------------
# 1. Connect to DuckDB and query
# ----------------------------
con = duckdb.connect("my_data.duckdb")
res = con.execute(query_yellow_green)

# ----------------------------
# 2. Build multiple KMeans pipelines (K = 2..15)
# ----------------------------
k_range = range(2, 16)
pipelines = {
    k: preprocessing.StandardScaler() | cluster.KMeans(
        n_clusters=k, halflife=0.4, sigma=3, seed=42
    )
    for k in k_range
}

# ----------------------------
# 3. Streaming loop
# ----------------------------
buffer_size = 5000
buffer = []

while True:
    chunk = res.fetch_df_chunk(vectors_per_chunk=50_000)
    if chunk is None or len(chunk) == 0:
        break

    records = chunk.to_dict(orient="records")

    for r in records:
        x = r
        for k, pipe in pipelines.items():
            pipe.learn_one(x)

        buffer.append(list(x.values()))
        if len(buffer) > buffer_size:
            buffer.pop(0)

    # ----------------------------
    # Evaluate clustering quality
    # ----------------------------
    if len(buffer) >= 100:
        scores = {}
        for k, pipe in pipelines.items():
            try:
                labels = [pipe.predict_one(dict(zip(records[0].keys(), row))) 
                          for row in buffer]
                if len(set(labels)) > 1:  # need ≥2 clusters
                    scores[k] = silhouette_score(buffer, labels)
            except Exception:
                continue

        if scores:
            best_k = max(scores, key=scores.get)
            print(f"Current best K = {best_k}, silhouette = {scores[best_k]:.4f}")


Current best K = 4, silhouette = 0.5557
Current best K = 5, silhouette = 0.5834


##### Result: the number of clusters that best for model is 5 with silhouette score is 0.5834. The silhouette score ranges from -1 to 1 which 1 represent there is a clear separation in the clustering. The bigger of silhouette score the better model in clustering.

## Run the result of number cluster is 5 to get the center of clusters

In [11]:
import pandas as pd
import numpy as np
import duckdb
from river import preprocessing, cluster
from sklearn.metrics import silhouette_score

# ----------------------------
# 1. Connect to DuckDB and query
# ----------------------------
con = duckdb.connect("my_data.duckdb")
res = con.execute(query_yellow_green)

# ----------------------------
# 2. Build KMeans pipeline
# ----------------------------
pipeline = preprocessing.StandardScaler() | cluster.KMeans(
    n_clusters=5,     # choose initial K
    halflife=0.4,     # controls how fast old info decays
    sigma=3,          # soft-assignment parameter
    seed=42
)

# ----------------------------
# 3. Outlier detection helper (per batch)
# ----------------------------
def remove_outliers(df, z_thresh=3.0):
    """Remove outliers based on Z-score (per numeric column)."""
    numeric_df = df.select_dtypes(include=[np.number])
    z_scores = (numeric_df - numeric_df.mean()) / numeric_df.std(ddof=0)
    mask = (np.abs(z_scores) < z_thresh).all(axis=1)  # keep only non-outliers
    return df[mask]

# ----------------------------
# 4. Streaming loop
# ----------------------------
buffer_size = 5000
buffer = []

while True:
    chunk = res.fetch_df_chunk(vectors_per_chunk=50_000)
    if chunk is None or len(chunk) == 0:
        break

    # --- Outlier removal per batch ---
    chunk = remove_outliers(chunk, z_thresh=3.0)

    # Convert to dicts for River
    records = chunk.to_dict(orient="records")

    for r in records:
        x = r  # all features
        pipeline.learn_one(x)

        # Keep buffer for evaluation
        buffer.append(list(x.values()))
        if len(buffer) > buffer_size:
            buffer.pop(0)

    # ----------------------------
    # Evaluate clustering
    # ----------------------------
    if len(buffer) >= 50:
        labels = [
            pipeline.predict_one({k: v for k, v in zip(chunk.columns, row)})
            for row in buffer
        ]
        try:
            sil = silhouette_score(buffer, labels)
            print(f"Silhouette score (approx): {sil:.4f}")
        except Exception:
            pass

# Final cluster centers
print("Cluster centers:", pipeline['KMeans'].centers)


Silhouette score (approx): 0.0089
Silhouette score (approx): 0.2529
Cluster centers: {0: defaultdict(..., {'total_amount': 0.8779653102027842, 'passenger_count': -0.49876002786219265, 'tip_amount': 0.45024335635523904, 'trip_distance': 0.600756834812259, 'duration_seconds': 1.448113233895696}), 1: defaultdict(..., {'total_amount': -0.6166052981449124, 'passenger_count': -0.3862226519245725, 'tip_amount': -0.6197334958083857, 'trip_distance': -0.6428257185358539, 'duration_seconds': -0.7267978256064775}), 2: defaultdict(..., {'total_amount': 2.5494488735630227, 'passenger_count': -0.4183395084646098, 'tip_amount': 2.576523022210882, 'trip_distance': 2.71471627982732, 'duration_seconds': 1.0167046283138084}), 3: defaultdict(..., {'total_amount': 3.691542889866933, 'passenger_count': -0.5082619742655079, 'tip_amount': 2.7702068425499218, 'trip_distance': -0.6705558019783625, 'duration_seconds': -0.9422935779812189}), 4: defaultdict(..., {'total_amount': -0.0753689229359324, 'passenger_cou

##### Result: The best number of K for clustering is 5 basen on Silhouette score. Feature selection may be considered to increase the model performance.

## 2. Clustering Tipping Pattern on High Value for Hire Vehicles

#### HVFHV query:

In [3]:
query_hvfhv_2019_2023 = """
WITH CTE_hvfhv_2019_2023 AS (
    SELECT 
        hvfhs_license_num AS provider,
        request_datetime AS request_time,
        pickup_datetime AS pick_up_time,
        CAST(trip_miles AS FLOAT) AS trip_distance,
        CAST(trip_time AS INTEGER) AS duration_seconds,
        CAST(base_passenger_fare AS FLOAT) AS base_fare,
        CAST(tolls AS FLOAT) AS toll_fare,
        CAST(bcf AS FLOAT) AS bcf_fare,
        CAST(sales_tax AS FLOAT) AS tax_fare,
        CAST(tips AS FLOAT) AS tip_amount,
        shared_request_flag AS shared_before,
        shared_match_flag AS shared_during,
        wav_request_flag AS wheelchair_request
    FROM 'C:/Users/ekadw/Documents/DATA/NY_Taxi/*/high_volume_for_hire_vehicle/fhvhv_tripdata_*.parquet'
    WHERE hvfhs_license_num IS NOT NULL
        AND request_datetime IS NOT NULL
        AND pickup_datetime IS NOT NULL
        AND trip_miles >= 0
        AND trip_miles <= 50
        AND trip_time >= 0
        AND base_passenger_fare >= 0
        AND tolls >= 0
        AND bcf >= 0
        AND sales_tax >= 0
        AND tips >= 0
        AND shared_request_flag IS NOT NULL
        AND shared_match_flag IS NOT NULL
        AND wav_request_flag IS NOT NULL
        AND request_datetime >= '2019-02-01' 
        AND request_datetime < '2023-10-01'
), CTE_duration_hvfhv_2019_2023 AS (
    SELECT
        provider,
        DATE_DIFF('day', request_time, pick_up_time) AS duration_days,
        EPOCH(pick_up_time - request_time) AS duration_request,
        trip_distance,
        duration_seconds,
        base_fare + toll_fare + bcf_fare + tax_fare  AS total_amount,
        shared_before,
        shared_during,
        wheelchair_request,
        tip_amount
    FROM CTE_hvfhv_2019_2023
)

SELECT 
    provider,
    duration_request,
    trip_distance,
    duration_seconds,
    total_amount,
    shared_before,
    shared_during,
    wheelchair_request,
    tip_amount
FROM CTE_duration_hvfhv_2019_2023
WHERE duration_days = 0
LIMIT 5
"""

con = duckdb.connect()
df_hvfhv_2019_2023 = con.execute(query_hvfhv_2019_2023).fetchdf()
df_hvfhv_2019_2023.head()

Unnamed: 0,provider,duration_request,trip_distance,duration_seconds,total_amount,shared_before,shared_during,wheelchair_request,tip_amount
0,HV0003,232.0,2.45,579,10.41,Y,N,N,0.0
1,HV0003,921.0,1.71,490,8.809999,N,N,N,2.0
2,HV0005,156.0,5.01,2159,50.07,N,Y,N,0.0
3,HV0005,96.0,0.34,179,8.01,N,Y,N,3.0
4,HV0005,207.0,6.84,1799,27.130001,N,Y,N,4.0


#### HVFHV with manual transformation to one hot encoding:

In [4]:
query_hvfhv_2019_2023 = """
WITH CTE_hvfhv_2019_2023 AS (
    SELECT 
        hvfhs_license_num AS provider,
        request_datetime AS request_time,
        pickup_datetime AS pick_up_time,
        CAST(trip_miles AS FLOAT) AS trip_distance,
        CAST(trip_time AS INTEGER) AS duration_seconds,
        CAST(base_passenger_fare AS FLOAT) AS base_fare,
        CAST(tolls AS FLOAT) AS toll_fare,
        CAST(bcf AS FLOAT) AS bcf_fare,
        CAST(sales_tax AS FLOAT) AS tax_fare,
        CAST(tips AS FLOAT) AS tip_amount,
        shared_request_flag AS shared_before,
        shared_match_flag AS shared_during,
        wav_request_flag AS wheelchair_request,
        CAST( CASE tips
            WHEN  0.0 THEN 1
            ELSE 0
        END AS INTEGER) AS tip_category
    FROM 'C:/Users/ekadw/Documents/DATA/NY_Taxi/*/high_volume_for_hire_vehicle/fhvhv_tripdata_*.parquet'
    WHERE hvfhs_license_num IS NOT NULL
        AND request_datetime IS NOT NULL
        AND pickup_datetime IS NOT NULL
        AND trip_miles >= 0
        AND trip_miles <= 50
        AND trip_time >= 0
        AND base_passenger_fare >= 0
        AND tolls >= 0
        AND bcf >= 0
        AND sales_tax >= 0
        AND tips >= 0
        AND shared_request_flag IS NOT NULL
        AND shared_match_flag IS NOT NULL
        AND wav_request_flag IS NOT NULL
        AND request_datetime >= '2019-02-01' 
        AND request_datetime < '2023-10-01'
), CTE_duration_hvfhv_2019_2023 AS (
    SELECT
        provider,
        CAST(CASE provider WHEN 'HV0002' THEN 1 ELSE 0 END AS INTEGER) AS provider_HV0002,
        CAST(CASE provider WHEN 'HV0003' THEN 1 ELSE 0 END AS INTEGER) AS provider_HV0003,
        CAST(CASE provider WHEN 'HV0004' THEN 1 ELSE 0 END AS INTEGER) AS provider_HV0004,
        CAST(CASE provider WHEN 'HV0005' THEN 1 ELSE 0 END AS INTEGER) AS provider_HV0005,
        DATE_DIFF('day', request_time, pick_up_time) AS duration_days,
        CAST(EPOCH(pick_up_time - request_time) AS INTEGER) AS duration_request,
        trip_distance,
        duration_seconds,
        base_fare + toll_fare + bcf_fare + tax_fare + tip_amount AS total_amount,
        shared_before,
        CAST(CASE shared_before WHEN 'Y' THEN 1 ELSE 0 END AS INTEGER) AS shared_before_yes,
        CAST(CASE shared_before WHEN 'N' THEN 1 ELSE 0 END AS INTEGER) AS shared_before_no,
        shared_during,
        CAST(CASE shared_during WHEN 'Y' THEN 1 ELSE 0 END AS INTEGER) AS shared_during_yes,
        CAST(CASE shared_during WHEN 'N' THEN 1 ELSE 0 END AS INTEGER) AS shared_during_no,
        wheelchair_request,
        CAST(CASE wheelchair_request WHEN 'Y' THEN 1 ELSE 0 END AS INTEGER) AS wheelchair_request_yes,
        CAST(CASE wheelchair_request WHEN 'N' THEN 1 ELSE 0 END AS INTEGER) AS wheelchair_request_no,
        tip_category
    FROM CTE_hvfhv_2019_2023
)

SELECT 
    provider_HV0002,
    provider_HV0003,
    provider_HV0004,
    provider_HV0005,
    duration_request,
    trip_distance,
    duration_seconds,
    total_amount,
    shared_before_yes,
    shared_before_no,
    shared_during_yes,
    shared_during_no,
    wheelchair_request_yes,
    wheelchair_request_no,
    tip_category
FROM CTE_duration_hvfhv_2019_2023
WHERE duration_days = 0
LIMIT 5
"""
con = duckdb.connect()
df_hvfhv_2019_2023 = con.execute(query_hvfhv_2019_2023).fetchdf()
df_hvfhv_2019_2023.head()

Unnamed: 0,provider_HV0002,provider_HV0003,provider_HV0004,provider_HV0005,duration_request,trip_distance,duration_seconds,total_amount,shared_before_yes,shared_before_no,shared_during_yes,shared_during_no,wheelchair_request_yes,wheelchair_request_no,tip_category
0,0,1,0,0,232,2.45,579,10.41,1,0,0,1,0,1,1
1,0,1,0,0,921,1.71,490,10.809999,0,1,0,1,0,1,0
2,0,0,0,1,156,5.01,2159,50.07,0,1,1,0,0,1,1
3,0,0,0,1,96,0.34,179,11.01,0,1,1,0,0,1,0
4,0,0,0,1,207,6.84,1799,31.130001,0,1,1,0,0,1,0


#### Query for batching:

In [None]:
query_hvfhv_2019_2023 = """
WITH CTE_hvfhv_2019_2023 AS (
    SELECT 
        hvfhs_license_num AS provider,
        request_datetime AS request_time,
        pickup_datetime AS pick_up_time,
        CAST(trip_miles AS FLOAT) AS trip_distance,
        CAST(trip_time AS INTEGER) AS duration_seconds,
        CAST(base_passenger_fare AS FLOAT) AS base_fare,
        CAST(tolls AS FLOAT) AS toll_fare,
        CAST(bcf AS FLOAT) AS bcf_fare,
        CAST(sales_tax AS FLOAT) AS tax_fare,
        CAST(tips AS FLOAT) AS tip_amount,
        shared_request_flag AS shared_before,
        shared_match_flag AS shared_during,
        wav_request_flag AS wheelchair_request,
        CAST( CASE tips
            WHEN  0.0 THEN 1
            ELSE 0
        END AS INTEGER) AS tip_category
    FROM 'C:/Users/ekadw/Documents/DATA/NY_Taxi/*/high_volume_for_hire_vehicle/fhvhv_tripdata_*.parquet'
    WHERE hvfhs_license_num IS NOT NULL
        AND request_datetime IS NOT NULL
        AND pickup_datetime IS NOT NULL
        AND trip_miles >= 0
        AND trip_miles <= 50
        AND trip_time >= 0
        AND base_passenger_fare >= 0
        AND tolls >= 0
        AND bcf >= 0
        AND sales_tax >= 0
        AND tips >= 0
        AND shared_request_flag IS NOT NULL
        AND shared_match_flag IS NOT NULL
        AND wav_request_flag IS NOT NULL
        AND request_datetime >= '2019-02-01' 
        AND request_datetime < '2023-10-01'
), CTE_duration_hvfhv_2019_2023 AS (
    SELECT
        provider,
        CAST(CASE provider WHEN 'HV0002' THEN 1 ELSE 0 END AS INTEGER) AS provider_HV0002,
        CAST(CASE provider WHEN 'HV0003' THEN 1 ELSE 0 END AS INTEGER) AS provider_HV0003,
        CAST(CASE provider WHEN 'HV0004' THEN 1 ELSE 0 END AS INTEGER) AS provider_HV0004,
        CAST(CASE provider WHEN 'HV0005' THEN 1 ELSE 0 END AS INTEGER) AS provider_HV0005,
        DATE_DIFF('day', request_time, pick_up_time) AS duration_days,
        CAST(EPOCH(pick_up_time - request_time) AS INTEGER) AS duration_request,
        trip_distance,
        duration_seconds,
        base_fare + toll_fare + bcf_fare + tax_fare + tip_amount AS total_amount,
        shared_before,
        CAST(CASE shared_before WHEN 'Y' THEN 1 ELSE 0 END AS INTEGER) AS shared_before_yes,
        CAST(CASE shared_before WHEN 'N' THEN 1 ELSE 0 END AS INTEGER) AS shared_before_no,
        shared_during,
        CAST(CASE shared_during WHEN 'Y' THEN 1 ELSE 0 END AS INTEGER) AS shared_during_yes,
        CAST(CASE shared_during WHEN 'N' THEN 1 ELSE 0 END AS INTEGER) AS shared_during_no,
        wheelchair_request,
        CAST(CASE wheelchair_request WHEN 'Y' THEN 1 ELSE 0 END AS INTEGER) AS wheelchair_request_yes,
        CAST(CASE wheelchair_request WHEN 'N' THEN 1 ELSE 0 END AS INTEGER) AS wheelchair_request_no,
        tip_category
    FROM CTE_hvfhv_2019_2023
)

SELECT 
    provider_HV0002,
    provider_HV0003,
    provider_HV0004,
    provider_HV0005,
    duration_request,
    trip_distance,
    duration_seconds,
    total_amount,
    shared_before_yes,
    shared_before_no,
    shared_during_yes,
    shared_during_no,
    wheelchair_request_yes,
    wheelchair_request_no,
    tip_category
FROM CTE_duration_hvfhv_2019_2023
WHERE duration_days = 0
"""

In [None]:
import pandas as pd
import numpy as np
import duckdb
from river import preprocessing, cluster
from sklearn.metrics import silhouette_score

# ----------------------------
# 1. Connect to DuckDB and query
# ----------------------------
con = duckdb.connect("my_data.duckdb")
res = con.execute(df_hvfhv_2019_2023)

# ----------------------------
# 2. Build multiple KMeans pipelines (K = 2..15)
# ----------------------------
k_range = range(2, 16)
pipelines = {
    k: preprocessing.StandardScaler() | cluster.KMeans(
        n_clusters=k, halflife=0.4, sigma=3, seed=42
    )
    for k in k_range
}

# ----------------------------
# 3. Streaming loop
# ----------------------------
buffer_size = 5000
buffer = []

while True:
    chunk = res.fetch_df_chunk(vectors_per_chunk=50_000)
    if chunk is None or len(chunk) == 0:
        break

    records = chunk.to_dict(orient="records")

    for r in records:
        x = r
        for k, pipe in pipelines.items():
            pipe.learn_one(x)

        buffer.append(list(x.values()))
        if len(buffer) > buffer_size:
            buffer.pop(0)

    # ----------------------------
    # Evaluate clustering quality
    # ----------------------------
    if len(buffer) >= 100:
        scores = {}
        for k, pipe in pipelines.items():
            try:
                labels = [pipe.predict_one(dict(zip(records[0].keys(), row))) 
                          for row in buffer]
                if len(set(labels)) > 1:  # need ≥2 clusters
                    scores[k] = silhouette_score(buffer, labels)
            except Exception:
                continue

        if scores:
            best_k = max(scores, key=scores.get)
            print(f"Current best K = {best_k}, silhouette = {scores[best_k]:.4f}")
