In [77]:
from clickhouse_connect import get_client

In [78]:
client = get_client(database="nyc_taxi")

In [111]:
uids = {"array_of_strings": "array(pickup_ntaname, dropoff_ntaname)",
        "hash_of_strings": "cityHash64((pickup_ntaname,dropoff_ntaname))",
        "hash_of_hashes": "cityHash64((cityHash64(pickup_ntaname),cityHash64(dropoff_ntaname)))"}

queries = {}
queries_data = {}
for k, v in uids.items():
    query = f"""select 
                {v} as uid, 
                toDate(pickup_datetime) as date, 
                sum(total_amount) as amount
            from trips_small
            where pickup_ntaname != '' and dropoff_ntaname != ''
            group by uid, date
            order by date"""

    print(k)
    queries_data[k] = query
    %timeit -n 5 -r 3 results = client.query(query)

array_of_strings
479 ms ± 33.8 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)
hash_of_strings
326 ms ± 17.5 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)
hash_of_hashes
267 ms ± 24 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)


In [112]:
queries_labels = {}
for k, q in queries_data.items():
    new_q = f"""select d1.uid, d1.date as prediction_date,
                if(d2.date is not null and d2.date <= d1.date + interval 7 day, 1, 0) as churn
                from data d1 asof join data d2
                on d1.uid=d2.uid
                and d1.date < d2.date
                order by date"""
    query = f"with data as ({q})" + new_q
    print(k)
    %timeit -n 5 -r 3 results = client.query(query)
    queries_labels[k] = f"with data as ({q}), labels as ({new_q})"


array_of_strings
913 ms ± 118 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)
hash_of_strings
533 ms ± 46.2 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)
hash_of_hashes
387 ms ± 20.1 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)


In [113]:
queries_features = {}
for k, q in queries_labels.items():
    new_q = """select uid, date, amount,
                avg(amount) over (
                    partition by uid 
                    order by date 
                    range between 2 preceding and current row
                ) as avg3d
                from data
                order by date"""
    print(k)
    query = q + new_q
    %timeit -n 5 -r 3 results = client.query(query)
    queries_features[k] = q + f",features as ({new_q})"


array_of_strings
792 ms ± 14.5 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)
hash_of_strings
339 ms ± 12.6 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)
hash_of_hashes
287 ms ± 5.25 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)


In [114]:
for k, q in queries_features.items():
    new_q = f"""select 
                l.uid, 
                l.prediction_date, 
                round(f.amount,2) as daily, 
                round(f.avg3d,2) avg3d, 
                l.churn 
            from labels l asof join features f 
            on l.uid=f.uid 
            and l.prediction_date >= f.date """
    query = q + new_q
    print(k)
    %timeit -n 5 -r 3 results = client.query(query)
    
final_query = query

array_of_strings
1.58 s ± 61.6 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)
hash_of_strings
821 ms ± 23.9 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)
hash_of_hashes
681 ms ± 23.2 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)


Using integers (hash from string) as uid (group / order / partition keys) is more efficient than using array of strings.

IMPORTANT NOTE: (verify) ASOF JOIN needs time-ordered data, window functions can benefit from it.



Example -- not full test -- for no future leakage:

Bath Beach -- Bath Beach route

2015-08-17 avg3d -- doesn't get data from the next day
2015-08-18 avg3d -- gets data from the previous day

2015-08-28 churn 1 -- next entry 2015-08-31
2015-08-31 churn 0 -- no leakage from previous entry

In [115]:
import pandas as pd

results = client.query(final_query)
results_df = pd.DataFrame(results.result_rows, columns=results.column_names)

bath = client.query(f"""select cityHash64((cityHash64(pickup_ntaname),cityHash64(dropoff_ntaname)))
                    from trips_small where 
                    pickup_ntaname = 'Bath Beach' and 
                    dropoff_ntaname='Bath Beach'""").result_rows[0][0]

print(results_df[results_df.uid == bath].sort_values('prediction_date'))

                        uid prediction_date  daily  avg3d  churn
29057   7989843086573400367      2015-07-19   4.80   4.80      0
89744   7989843086573400367      2015-08-17   3.80   3.80      1
90747   7989843086573400367      2015-08-18  12.96   8.38      1
107990  7989843086573400367      2015-08-25   4.30   4.30      1
114474  7989843086573400367      2015-08-28   3.80   3.80      1
122325  7989843086573400367      2015-08-31   4.30   4.30      0
147060  7989843086573400367      2015-09-14   8.60   8.60      0
