In [0]:
# STEP 1: Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import ArrayType, FloatType
from sentence_transformers import SentenceTransformer
import pandas as pd

spark = SparkSession.builder.getOrCreate()

In [0]:
df = spark.read.csv("/Workspace/Users/mayank.kakkar96@gmail.com/home_shopping_metadata.csv", header=True, inferSchema=True)
df.show(5)

+---+-------------------+-----------+--------------------+---------+------+
| id|       product_name|   category|         description|    brand| price|
+---+-------------------+-----------+--------------------+---------+------+
|  1|CosmoLife Face Wash|     Beauty|The Face Wash fro...|CosmoLife| 80.31|
|  2|SmartEase Knife Set|Kitchenware|The Knife Set fro...|SmartEase|661.09|
|  3|    ZenHaus Perfume|     Beauty|The Perfume from ...|  ZenHaus|553.93|
|  4|  DailyLux Sneakers|    Fashion|The Sneakers from...| DailyLux|292.73|
|  5|   CozyHome Blender| Appliances|The Blender from ...| CozyHome|430.53|
+---+-------------------+-----------+--------------------+---------+------+
only showing top 5 rows


In [0]:
# Save as Delta table for future use
df.write.format("delta").mode("overwrite").saveAsTable("home_shopping_metadata_delta")

In [0]:
# Load open-source model
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

# Read the Delta table
df = spark.table("home_shopping_metadata_delta")

# Convert to Pandas to compute embeddings
pdf = df.toPandas()

# Create embeddings for each description
pdf["embedding"] = pdf["description"].apply(lambda x: model.encode(x).tolist())

# Convert back to Spark DataFrame
df_embedded = spark.createDataFrame(pdf)

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [0]:
# Drop table if it exists to avoid schema merge conflicts
spark.sql("DROP TABLE IF EXISTS home_shopping_metadata_embedded")

# Ensure embedding column is array<float>
df_fixed = df_embedded.withColumn("embedding", col("embedding").cast(ArrayType(FloatType())))

# Save as new Delta table
df_fixed.write.format("delta").mode("overwrite").saveAsTable("home_shopping_metadata_embedded")

# Verify schema
spark.table("home_shopping_metadata_embedded").printSchema()

root
 |-- id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- description: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- embedding: array (nullable = true)
 |    |-- element: float (containsNull = true)



In [0]:
from databricks.vector_search.client import VectorSearchClient

vsc = VectorSearchClient()
endpoint_name = "vs_endpoint"        # Should already exist in your workspace
index_name = "home_shopping_index"

# Get embedding dimension (MiniLM = 384)
embedding_dim = model.get_sentence_embedding_dimension()

# Create the index
vsc.create_delta_sync_index(pipeline_type='batch',
    endpoint_name=endpoint_name,
    index_name=index_name,
    source_table_name="home_shopping_metadata_embedded",
    primary_key="id",
    embedding_source_column="embedding",
    embedding_dimension=embedding_dim
)

[NOTICE] Using a notebook authentication token. Recommended for development only. For improved performance, please use Service Principal based authentication. To disable this message, pass disable_notice=True.


[0;31m---------------------------------------------------------------------------[0m
[0;31mAssertionError[0m                            Traceback (most recent call last)
File [0;32m<command-5441652853024861>, line 11[0m
[1;32m      8[0m embedding_dim [38;5;241m=[39m model[38;5;241m.[39mget_sentence_embedding_dimension()
[1;32m     10[0m [38;5;66;03m# Create the index[39;00m
[0;32m---> 11[0m vsc[38;5;241m.[39mcreate_delta_sync_index(pipeline_type[38;5;241m=[39m[38;5;124m'[39m[38;5;124mbatch[39m[38;5;124m'[39m,
[1;32m     12[0m     endpoint_name[38;5;241m=[39mendpoint_name,
[1;32m     13[0m     index_name[38;5;241m=[39mindex_name,
[1;32m     14[0m     source_table_name[38;5;241m=[39m[38;5;124m"[39m[38;5;124mhome_shopping_metadata_embedded[39m[38;5;124m"[39m,
[1;32m     15[0m     primary_key[38;5;241m=[39m[38;5;124m"[39m[38;5;124mid[39m[38;5;124m"[39m,
[1;32m     16[0m     embedding_source_column[38;5;241m=[39m[38;5;124m"[39m[

In [0]:
index = vsc.get_index("your_embedding_endpoint_name", "home_shopping_vector_index")

query = "affordable furniture for living room"
results = index.similarity_search(
    query_text=query,
    columns=["id", "product_name", "category", "price", "description"],
    num_results=5
)

display(results)

{"ts": "2025-11-09 22:39:41.274", "level": "ERROR", "logger": "pyspark.sql.connect.logging", "msg": "GRPC Error received", "context": {}, "exception": {"class": "_MultiThreadedRendezvous", "msg": "<_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.INTERNAL\n\tdetails = \"[DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'embedding' and 'embedding'.\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer ipv4:127.0.0.1:7073 {created_time:\"2025-11-09T22:39:41.273558343+00:00\", grpc_status:13, grpc_message:\"[DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields \\'embedding\\' and \\'embedding\\'.\"}\"\n>", "stacktrace": [{"class": null, "method": "_execute_and_fetch_as_iterator", "file": "/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", "line": "2019"}, {"class": null, "method": "__next__", "file": "<frozen _collections_abc>", "line": "356"}, {"class": null, "method": "send", "file": "/databricks/python/lib/python3

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-5441652853024860>, line 5[0m
[1;32m      2[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfunctions[39;00m [38;5;28;01mimport[39;00m col
[1;32m      4[0m df_fixed [38;5;241m=[39m df_embedded[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124membedding[39m[38;5;124m"[39m, col([38;5;124m"[39m[38;5;124membedding[39m[38;5;124m"[39m)[38;5;241m.[39mcast(ArrayType(FloatType())))
[0;32m----> 5[0m df_fixed[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39msaveAsTable([38;5;124m"[39m[38;5;124mhome_shopping_metadata_embedded[39m[38;5;124

In [0]:
from databricks.vector_search.client import VectorSearchClient

# Initialize client
vsc = VectorSearchClient()

# Define the names
endpoint_name = "vs_endpoint"
index_name = "home_shopping_index"
table_name = "home_shopping_metadata_embedded"

# Step 1: Get the Delta table path or name
# Assuming your table is already created in Databricks as:
# df_with_embeddings.write.format("delta").saveAsTable(table_name)

# Step 2: Get embedding dimension from your model
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
embedding_dim = model.get_sentence_embedding_dimension()

print(f"Embedding dimension: {embedding_dim}")  # Should print 384

# Step 3: Create the index
vsc.create_delta_sync_index(pipeline_type='vector_search',
    endpoint_name=endpoint_name,
    index_name=index_name,
    source_table_name=table_name,
    primary_key="id",
    embedding_source_column="embedding",
    embedding_dimension=embedding_dim
)

[NOTICE] Using a notebook authentication token. Recommended for development only. For improved performance, please use Service Principal based authentication. To disable this message, pass disable_notice=True.
Embedding dimension: 384


[0;31m---------------------------------------------------------------------------[0m
[0;31mAssertionError[0m                            Traceback (most recent call last)
File [0;32m<command-5441652853024855>, line 23[0m
[1;32m     20[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mEmbedding dimension: [39m[38;5;132;01m{[39;00membedding_dim[38;5;132;01m}[39;00m[38;5;124m"[39m)  [38;5;66;03m# Should print 384[39;00m
[1;32m     22[0m [38;5;66;03m# Step 3: Create the index[39;00m
[0;32m---> 23[0m vsc[38;5;241m.[39mcreate_delta_sync_index(pipeline_type[38;5;241m=[39m[38;5;124m'[39m[38;5;124mvector_search[39m[38;5;124m'[39m,
[1;32m     24[0m     endpoint_name[38;5;241m=[39mendpoint_name,
[1;32m     25[0m     index_name[38;5;241m=[39mindex_name,
[1;32m     26[0m     source_table_name[38;5;241m=[39mtable_name,
[1;32m     27[0m     primary_key[38;5;241m=[39m[38;5;124m"[39m[38;5;124mid[39m[38;5;124m"[39m,
[1;32m     28