In [1]:
import os

import duckdb
import numpy as np
import pyarrow as pa
import pyarrow.compute
import pyarrow.parquet
import ray

In [2]:
from uptrain.quick_ops.ops_agg import CosineDist, L2Dist, PartitionOp
from uptrain.quick_ops.ops_io import DuckdbReader, ParquetWriter

In [3]:
runtime_env = {"env_vars": {"NUMBA_DISABLE_PERFORMANCE_WARNINGS": "1"}}
ray.init(runtime_env=runtime_env)

2023-03-28 21:50:24,825	INFO worker.py:1553 -- Started a local Ray instance.


0,1
Python version:,3.10.9
Ray version:,2.3.0


In [4]:
import atexit

atexit.register(ray.shutdown)

<function ray._private.worker.shutdown(_exiting_interpreter: bool = False)>

In [5]:
DATASET_DIR = "/Users/ishananand/repos/datasets/uptrain/views_dataset/"

[2m[33m(raylet)[0m   aiogrpc.init_grpc_aio()


## line chart metrics

In [44]:
metric_op = PartitionOp(
    columns=["postId", "model_type", "sig_type"],
    agg_ops=dict(
        cos_initial=CosineDist(value_col="embs", seq_col="views", mode="initial"),
        cos_running=CosineDist(value_col="embs", seq_col="views", mode="running"),
        l2_initial=L2Dist(value_col="embs", seq_col="views", mode="initial"),
        l2_running=L2Dist(value_col="embs", seq_col="views", mode="running"),
    ),
)
metric_actor = metric_op.make_actor()

In [45]:
import tqdm

fnames = sorted(os.listdir(DATASET_DIR))
for fname in tqdm.tqdm(fnames[:10]):
    ds = pa.parquet.read_table(str(os.path.join(DATASET_DIR, fname)))
    result_ref = metric_actor.run.remote(ds)
    result = ray.get(result_ref)

100%|███████████████████████████████████████████████████████████████████| 10/10 [00:15<00:00,  1.55s/it]


## statistics at each checkpoint

In [46]:
checkpoints = [0, 1000, 2500, 5000, 10000, 25000, 50000, 100000]

In [47]:
def generate_case_stmt(checkpoints):
    slacks = list(int(x / 10) for x in np.diff(checkpoints))
    slacks.insert(0, int(slacks[0] / 2))

    case_stmt = "CASE\n"
    for slack, point in zip(slacks, checkpoints):
        case_stmt += f"WHEN views BETWEEN {max(0, point-slack)} AND {point + slack} THEN {point}\n"
    case_stmt += "ELSE NULL\nEND"
    return case_stmt

In [48]:
group_query_str = """
SELECT 
    *,
    {case_stmt} AS checkpoint
FROM
    result
WHERE
    checkpoint IS NOT NULL
""".format(
    case_stmt=generate_case_stmt(checkpoints)
)

duckdb.execute(group_query_str)
tbl = duckdb.fetch_arrow_table()

In [50]:
tbl.schema

model_type: string
sig_type: string
postId: int64
embs: list<l: double>
  child 0, l: double
bias: double
tagGenre: string
views: int64
emb_update_time: timestamp[us, tz=UTC]
partition_index: string
cos_initial: double
cos_running: double
l2_initial: double
l2_running: double
checkpoint: int32

In [55]:
tbl.group_by(["checkpoint"]).aggregate(
    [("cos_initial", "mean"), ("cos_initial", "stddev"), ("postId", "count")]
)

pyarrow.Table
cos_initial_mean: double
cos_initial_stddev: double
postId_count: int64
checkpoint: int32
----
cos_initial_mean: [[0.0026516188134688232,0.012198132251292416,0.010676663928267329,0.09756951672937267,0.07245846191322035,0.019940778003835643,0.007679197579266151,0.021165858092912403]]
cos_initial_stddev: [[0.004141232561326782,0.02889598657493061,0.02427104726545848,0.13888052290350913,0.13452522400308492,0.03591387452039128,0.004051337233501998,0.03074757927831714]]
postId_count: [[2430,862,699,1274,571,897,656,433]]
checkpoint: [[0,1000,2500,10000,5000,50000,100000,25000]]

*** SIGTERM received at time=1680030233 ***
PC: @        0x187606d54  (unknown)  kevent
[2023-03-29 00:33:53,917 E 6589 165776] logging.cc:361: *** SIGTERM received at time=1680030233 ***
[2023-03-29 00:33:53,917 E 6589 165776] logging.cc:361: PC: @        0x187606d54  (unknown)  kevent


## distance metrics between pairs of posts at each checkpoint

In [19]:
group_query_str = """
SELECT 
    *,
    {case_stmt} AS checkpoint
FROM
    ds
WHERE
    checkpoint IS NOT NULL
""".format(
    case_stmt=generate_case_stmt(checkpoints)
)

duckdb.execute(group_query_str)
tbl = duckdb.fetch_arrow_table()

In [24]:
tbl.group_by(["checkpoint"]).aggregate([("postId", "count")])

pyarrow.Table
postId_count: int64
checkpoint: int32
----
postId_count: [[2430,862,571,897,656,699,1274,433]]
checkpoint: [[0,1000,5000,50000,100000,2500,10000,25000]]

In [28]:
from uptrain.quick_ops.ops_agg import compute_op_cosine_dist_running

In [39]:
rng = np.random.default_rng(42)
NUM_SAMPLES = 1000

results = []
for point in checkpoints:
    sub_tbl = tbl.filter(pc.field("checkpoint") == point)
    values = sub_tbl["embs"].to_numpy()

    sample_pair_indices = rng.choice(len(values), size=(NUM_SAMPLES, 2))
    out = []
    for i, j in sample_pair_indices:
        val, _ = compute_op_cosine_dist_running(values[i], values[j])
        out.append(val)
    results.append(np.array(out))

In [40]:
for res in results:
    print(np.mean(res), np.std(res))

0.46764481540916913 0.2410999163475373
0.4080929200570356 0.13504422765044968
0.4194197950267693 0.1488175745557255
0.5224944343948985 0.17563401941367637
0.5482238137975906 0.23671032842008488
0.7108836637784443 0.2608231597483439
0.6674368399762961 0.2774992745418799
0.681480015911964 0.35700323510553544


## appendix