In [None]:
# ---- Parameters youâ€™ll tune ----
SOURCE_TABLE = "SCRAPEDATA"
TEXT_COL = "CAPTION"

# Output tables
EMBED_TABLE = "SCRAPEDATA_CAPTION_EMBEDS"
CLUSTER_TABLE = "SCRAPEDATA_CAPTION_CLUSTERS"

# Model + clustering hyperparams
EMBED_MODEL = "snowflake-arctic-embed-m"   # also commonly: "e5-base-v2" :contentReference[oaicite:1]{index=1}
K = 12                                     # start here; tune later
RANDOM_STATE = 42

# For large tables, start with a sample for fitting, then score all rows
FIT_SAMPLE_N = 50   # increase if you can (depends on warehouse & budget)
BATCH_SIZE = 200     # batch scoring/writeback


In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, sql_expr

import numpy as np
import pandas as pd

from sklearn.cluster import MiniBatchKMeans

session = get_active_session()

In [None]:
# Create embeddings table if it doesn't exist yet

create_embeds_sql = f"""
CREATE TABLE IF NOT EXISTS {EMBED_TABLE} AS
SELECT
  /* If you have a unique ID column, include it here instead of HASH */
  SHA2(TO_VARCHAR({TEXT_COL}), 256) AS content_hash,
  {TEXT_COL} AS content,
  AI_EMBED('{EMBED_MODEL}', {TEXT_COL}) AS embedding,
  CURRENT_TIMESTAMP() AS embedded_at,
  '{EMBED_MODEL}' AS embedding_model
FROM {SOURCE_TABLE}
WHERE {TEXT_COL} IS NOT NULL
  AND LENGTH(TRIM({TEXT_COL})) > 0
;
"""
session.sql(create_embeds_sql).collect()

# # If table already existed and you want to (re)embed new rows only:
# insert_new_sql = f"""
# INSERT INTO {EMBED_TABLE} (content_hash, content, embedding, embedded_at, embedding_model)
# SELECT
#   SHA2(TO_VARCHAR(s.{TEXT_COL}), 256) AS content_hash,
#   s.{TEXT_COL} AS content,
#   AI_EMBED('{EMBED_MODEL}', s.{TEXT_COL}) AS embedding,
#   CURRENT_TIMESTAMP() AS embedded_at,
#   '{EMBED_MODEL}' AS embedding_model
# FROM {SOURCE_TABLE} s
# LEFT JOIN {EMBED_TABLE} e
#   ON e.content_hash = SHA2(TO_VARCHAR(s.{TEXT_COL}), 256)
# WHERE s.{TEXT_COL} IS NOT NULL
#   AND LENGTH(TRIM(s.{TEXT_COL})) > 0
#   AND e.content_hash IS NULL
# ;
# """
# session.sql(insert_new_sql).collect()


In [None]:
df_embed = session.table(EMBED_TABLE).select("content_hash", "embedding")

# Sample for training
df_fit = df_embed.sample(n=FIT_SAMPLE_N) if FIT_SAMPLE_N else df_embed

pdf_fit = df_fit.to_pandas()
pdf_fit.head()


In [None]:
def to_vec(x):
    # x is usually list-like already; this keeps it robust
    if x is None:
        return None
    return np.asarray(x, dtype=np.float32)

X = np.vstack([to_vec(v) for v in pdf_fit["EMBEDDING"].tolist() if v is not None])
hashes_fit = pdf_fit["CONTENT_HASH"].tolist()

X.shape


In [None]:
kmeans = MiniBatchKMeans(
    n_clusters=K,
    random_state=RANDOM_STATE,
    batch_size=2048,
    n_init="auto"
)
kmeans.fit(X)

# Optional quick sanity check: cluster sizes on the fit sample
labels_fit = kmeans.predict(X)
(pd.Series(labels_fit).value_counts().sort_index())


In [None]:
session.sql(f"""
CREATE TABLE IF NOT EXISTS {CLUSTER_TABLE} (
  content_hash STRING,
  cluster_id INTEGER,
  k INTEGER,
  embedding_model STRING,
  scored_at TIMESTAMP_NTZ
);
""").collect()

# # Clear previous run for same (K, model) if you want a clean rerun:
# session.sql(f"""
# DELETE FROM {CLUSTER_TABLE}
# WHERE k = {K}
#   AND embedding_model = '{EMBED_MODEL}';
# """).collect()


In [None]:
total = session.table(EMBED_TABLE).count()
total
offset = 0
while offset < total:
    batch_df = (
        session.table(EMBED_TABLE)
        .select("content_hash", "embedding")
        .sort(col("content_hash"))
        .limit(BATCH_SIZE, offset=offset)
    )

    batch_pdf = batch_df.to_pandas()
    if batch_pdf.empty:
        break

    Xb = np.vstack([to_vec(v) for v in batch_pdf["EMBEDDING"].tolist() if v is not None])
    hb = batch_pdf["CONTENT_HASH"].tolist()

    preds = kmeans.predict(Xb)

    out_pdf = pd.DataFrame({
        "CONTENT_HASH": hb,
        "CLUSTER_ID": preds.astype(int),
        "K": K,
        "EMBEDDING_MODEL": EMBED_MODEL
    })

    out_sdf = session.create_dataframe(out_pdf)
    out_sdf = out_sdf.with_column("SCORED_AT", sql_expr("CURRENT_TIMESTAMP()"))

    out_sdf.write.mode("append").save_as_table(CLUSTER_TABLE)

    offset += BATCH_SIZE

session.table(CLUSTER_TABLE).filter((col("K")==K) & (col("EMBEDDING_MODEL")==EMBED_MODEL)).count()



In [None]:
# Join back to content and show a few examples per cluster
joined = session.table(CLUSTER_TABLE).alias("c").join(
    session.table(EMBED_TABLE).select("content_hash", "content").alias("e"),
    col("content_hashc") == col("content_hashe"),
    how="inner"
).select(
    col("cluster_id"),
    col("content")
).filter(col("cluster_id").is_not_null())

# Pull a small sample for exploration in the notebook UI
pdf_examples = joined.sample(n=2000).to_pandas()
pdf_examples.groupby("CLUSTER_ID").head(5)
