## Install app requirements

In [None]:
!pip install --upgrade pip
!pip install -r requirements.txt
!source .venv/bin/activate

___

### Load environment variables

In [1]:
from commons.constants import *
mdb_url = MONGO_PRODUCTS_COLLECTION_URL

## Connect to Spark Cluster using SparkConnect

In [2]:
from analytics.sc_session import get_session
session = get_session()
session

<pyspark.sql.connect.session.SparkSession at 0x106ed9870>

In [3]:
from analytics.sc_data import load_products_collection
df = load_products_collection(session)
df

DataFrame[_id: struct<product_id:bigint>, product_created_at: string, product_deleted_at: void, product_name: string, product_properties: string, product_sku: string, product_updated_at: string]

In [None]:
df.printSchema()

In [None]:
df.createOrReplaceTempView("mbdtfm_magento_products_tmp")
df5 = session.sql("""
    select _id.product_id, product_name
    from mbdtfm_magento_products_tmp
    where product_deleted_at is null
    order by product_name asc limit 5
  """)
df5.show()

In [4]:
df.show(n=10)

+----+--------------------+------------------+--------------------+--------------------+-----------+--------------------+
| _id|  product_created_at|product_deleted_at|        product_name|  product_properties|product_sku|  product_updated_at|
+----+--------------------+------------------+--------------------+--------------------+-----------+--------------------+
| {2}|2025-08-08T09:29:58Z|              NULL|Strive Shoulder Pack|{"product_sku":"2...|    24-MB04|2025-08-08T09:29:58Z|
| {3}|2025-08-08T09:29:58Z|              NULL|Crown Summit Back...|{"product_sku": "...|    24-MB03|2025-08-08T09:29:58Z|
| {5}|2025-08-08T09:29:58Z|              NULL|Rival Field Messe...|{"product_sku":"2...|    24-MB06|2025-08-08T09:29:58Z|
| {6}|2025-08-08T09:29:58Z|              NULL|     Fusion Backpack|{"product_sku": "...|    24-MB02|2025-08-08T09:29:58Z|
| {8}|2025-08-08T09:29:59Z|              NULL|     Voyage Yoga Bag|{"product_sku": "...|    24-WB01|2025-08-08T09:29:59Z|
|{10}|2025-08-08T09:29:5

In [None]:
df.describe().show()

In [None]:
df.columns

In [5]:
import json
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql import DataFrame
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from typing import Dict

def create_product_embeddings(df: DataFrame, properties_column: str = "product_properties", embeddings_column: str = "product_embeddings") -> None:
    """
    Reads products from MongoDB, calculates embeddings for the `product_properties`
    field in collection `mbdtfm_magento_catalog_products_mview`, and writes back to MongoDB
    with the new `product_embeddings` column in the same collection.
    :param df: DataFrame containing the `product_properties` column to create embeddings from.
    :param properties_column: Name of the `product_properties` column to create embeddings from.
    :param embeddings_column: Name of the `product_embeddings` column to create embeddings to.
    :return: None.
    """
    flatten_udf = udf(__flatten_json, StringType())
    df_flat = df.withColumn(f"{properties_column}_flat", flatten_udf(col(properties_column)))
    source_column = f"{properties_column}_flat"
    df = __compute_embeddings(df_flat, embeddings_column, source_column)
    (df.write.format("mongodb")
        .option("spark.mongodb.write.connection.uri", MONGO_URL)
        .option("database", MONGO_DATABASE)
        .option("collection", MONGO_PRODUCTS_COLLECTION)
        .mode("append")
        .save())


def __compute_embeddings(df: DataFrame, target_column: str, source_column: str) -> DataFrame:
    tokenizer = Tokenizer(inputCol=source_column, outputCol="tokens")
    tokenized = tokenizer.transform(df)
    hashing_tf = HashingTF(inputCol="tokens", outputCol="rawFeatures", numFeatures=128)
    featurized = hashing_tf.transform(tokenized)
    idf = IDF(inputCol="rawFeatures", outputCol=target_column)
    idf_model = idf.fit(featurized)
    return idf_model.transform(featurized)

def __flatten_json(json_str: str) -> str:
    try:
        flat = __flatten(json.loads(json_str))
        return ' '.join([f"{k}={v}" for k, v in flat.items()])
    except Exception:
        return ''

def __flatten(obj: Dict, prefix: str = '') -> Dict[str, str]:
    """
    Recursive helper to flatten JSON objects inside a UDF, preparing data for tokenization.
    :param obj: JSON object.
    :param prefix: prefix to prepend to all keys.
    :return: flattened dictionary.
    """
    items = []
    for k, v in obj.items():
        new_key = f"{prefix}{k}" if prefix == '' else f"{prefix}.{k}"
        if isinstance(v, dict):
            items.extend(__flatten(v, new_key).items()) # Recursive call.
        elif isinstance(v, list):
            val_str = ' '.join([str(item) for item in v])
            items.append((new_key, val_str))
        else:
            items.append((new_key, str(v)))
    return dict(items)

In [6]:
create_product_embeddings(df)

df.columns

+---+--------------------+------------------+--------------------+--------------------+-----------+--------------------+-----------------------+
|_id|  product_created_at|product_deleted_at|        product_name|  product_properties|product_sku|  product_updated_at|product_properties_flat|
+---+--------------------+------------------+--------------------+--------------------+-----------+--------------------+-----------------------+
|{2}|2025-08-08T09:29:58Z|              NULL|Strive Shoulder Pack|{"product_sku":"2...|    24-MB04|2025-08-08T09:29:58Z|   product_sku=24-MB...|
|{3}|2025-08-08T09:29:58Z|              NULL|Crown Summit Back...|{"product_sku": "...|    24-MB03|2025-08-08T09:29:58Z|   product_sku=24-MB...|
|{5}|2025-08-08T09:29:58Z|              NULL|Rival Field Messe...|{"product_sku":"2...|    24-MB06|2025-08-08T09:29:58Z|   product_sku=24-MB...|
|{6}|2025-08-08T09:29:58Z|              NULL|     Fusion Backpack|{"product_sku": "...|    24-MB02|2025-08-08T09:29:58Z|   product

SparkException: Writing job failed.

JVM stacktrace:
org.apache.spark.SparkException
	at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobFailedError(QueryExecutionErrors.scala:893)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:452)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:397)
	at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:237)
	at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:360)
	at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:358)
	at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:237)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:192)
	at org.apache.spark.sql.classic.DataFrameWriter.runCommand(DataFrameWriter.scala:622)
	at org.apache.spark.sql.classic.DataFrameWriter.saveInternal(DataFrameWriter.scala:190)
	at org.apache.spark.sql.classic.DataFrameWriter.save(DataFrameWriter.scala:126)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:2954)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2492)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:322)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:224)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:347)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 7) (172.19.0.4 executor 6): com.mongodb.spark.sql.connector.exceptions.DataException: Cannot cast (128,[0,1,2,3,4,5,6,7,8,10,11,12,13,14,15,16,17,18,19,20,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,38,39,40,41,42,43,44,45,46,47,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,98,99,100,102,103,104,105,106,107,108,109,111,112,113,114,115,116,117,118,119,120,121,122,123,124,126,127],[3.0,2.0,5.0,2.0,7.0,5.0,1.0,1.0,5.0,2.0,3.0,1.0,3.0,1.0,2.0,1.0,8.0,4.0,1.0,1.0,2.0,3.0,1.0,2.0,3.0,1.0,4.0,4.0,6.0,2.0,3.0,1.0,1.0,3.0,2.0,2.0,4.0,2.0,5.0,5.0,3.0,5.0,2.0,1.0,3.0,3.0,1.0,1.0,5.0,2.0,2.0,4.0,3.0,3.0,5.0,4.0,3.0,5.0,3.0,4.0,3.0,3.0,3.0,2.0,2.0,1.0,1.0,2.0,2.0,1.0,4.0,2.0,3.0,6.0,2.0,4.0,3.0,2.0,1.0,3.0,2.0,3.0,8.0,3.0,2.0,2.0,3.0,1.0,5.0,1.0,4.0,3.0,5.0,3.0,2.0,2.0,2.0,3.0,2.0,3.0,5.0,5.0,2.0,1.0,5.0,1.0,2.0,1.0,3.0,2.0,5.0,5.0,2.0,1.0,1.0,2.0,3.0,6.0]) into a BsonValue. org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 has no matching BsonValue. Error: Cannot cast (128,[0,1,2,3,4,5,6,7,8,10,11,12,13,14,15,16,17,18,19,20,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,38,39,40,41,42,43,44,45,46,47,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,98,99,100,102,103,104,105,106,107,108,109,111,112,113,114,115,116,117,118,119,120,121,122,123,124,126,127],[3.0,2.0,5.0,2.0,7.0,5.0,1.0,1.0,5.0,2.0,3.0,1.0,3.0,1.0,2.0,1.0,8.0,4.0,1.0,1.0,2.0,3.0,1.0,2.0,3.0,1.0,4.0,4.0,6.0,2.0,3.0,1.0,1.0,3.0,2.0,2.0,4.0,2.0,5.0,5.0,3.0,5.0,2.0,1.0,3.0,3.0,1.0,1.0,5.0,2.0,2.0,4.0,3.0,3.0,5.0,4.0,3.0,5.0,3.0,4.0,3.0,3.0,3.0,2.0,2.0,1.0,1.0,2.0,2.0,1.0,4.0,2.0,3.0,6.0,2.0,4.0,3.0,2.0,1.0,3.0,2.0,3.0,8.0,3.0,2.0,2.0,3.0,1.0,5.0,1.0,4.0,3.0,5.0,3.0,2.0,2.0,2.0,3.0,2.0,3.0,5.0,5.0,2.0,1.0,5.0,1.0,2.0,1.0,3.0,2.0,5.0,5.0,2.0,1.0,1.0,2.0,3.0,6.0]) into a BsonValue. org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 data type has no matching BsonValue.
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.lambda$createObjectToBsonValue$be3154de$1(RowToBsonDocumentConverter.java:140)
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.lambda$dataTypeToBsonElement$300467b5$1(RowToBsonDocumentConverter.java:153)
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.rowToBsonDocument(RowToBsonDocumentConverter.java:270)
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.fromRow(RowToBsonDocumentConverter.java:116)
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.fromRow(RowToBsonDocumentConverter.java:105)
	at com.mongodb.spark.sql.connector.write.MongoDataWriter.write(MongoDataWriter.java:91)
	at com.mongodb.spark.sql.connector.write.MongoDataWriter.write(MongoDataWriter.java:44)
	at org.apache.spark.sql.connector.write.DataWriter.writeAll(DataWriter.java:108)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:587)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:483)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1323)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:535)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:466)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:584)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:427)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	Suppressed: com.mongodb.spark.sql.connector.exceptions.DataException: Write aborted for: PartitionId: 0, TaskId: 7. Manual data clean up may be required.
		at com.mongodb.spark.sql.connector.write.MongoDataWriter.abort(MongoDataWriter.java:121)
		at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$10(WriteToDataSourceV2Exec.scala:528)
		at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1334)
		... 15 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:424)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:397)
	at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:237)
	at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:360)
	at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:358)
	at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:237)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
	at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:192)
	at org.apache.spark.sql.classic.DataFrameWriter.runCommand(DataFrameWriter.scala:622)
	at org.apache.spark.sql.classic.DataFrameWriter.saveInternal(DataFrameWriter.scala:190)
	at org.apache.spark.sql.classic.DataFrameWriter.save(DataFrameWriter.scala:126)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:2954)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2492)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:322)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:224)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:347)
Caused by: com.mongodb.spark.sql.connector.exceptions.DataException: Cannot cast (128,[0,1,2,3,4,5,6,7,8,10,11,12,13,14,15,16,17,18,19,20,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,38,39,40,41,42,43,44,45,46,47,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,98,99,100,102,103,104,105,106,107,108,109,111,112,113,114,115,116,117,118,119,120,121,122,123,124,126,127],[3.0,2.0,5.0,2.0,7.0,5.0,1.0,1.0,5.0,2.0,3.0,1.0,3.0,1.0,2.0,1.0,8.0,4.0,1.0,1.0,2.0,3.0,1.0,2.0,3.0,1.0,4.0,4.0,6.0,2.0,3.0,1.0,1.0,3.0,2.0,2.0,4.0,2.0,5.0,5.0,3.0,5.0,2.0,1.0,3.0,3.0,1.0,1.0,5.0,2.0,2.0,4.0,3.0,3.0,5.0,4.0,3.0,5.0,3.0,4.0,3.0,3.0,3.0,2.0,2.0,1.0,1.0,2.0,2.0,1.0,4.0,2.0,3.0,6.0,2.0,4.0,3.0,2.0,1.0,3.0,2.0,3.0,8.0,3.0,2.0,2.0,3.0,1.0,5.0,1.0,4.0,3.0,5.0,3.0,2.0,2.0,2.0,3.0,2.0,3.0,5.0,5.0,2.0,1.0,5.0,1.0,2.0,1.0,3.0,2.0,5.0,5.0,2.0,1.0,1.0,2.0,3.0,6.0]) into a BsonValue. org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 has no matching BsonValue. Error: Cannot cast (128,[0,1,2,3,4,5,6,7,8,10,11,12,13,14,15,16,17,18,19,20,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,38,39,40,41,42,43,44,45,46,47,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,98,99,100,102,103,104,105,106,107,108,109,111,112,113,114,115,116,117,118,119,120,121,122,123,124,126,127],[3.0,2.0,5.0,2.0,7.0,5.0,1.0,1.0,5.0,2.0,3.0,1.0,3.0,1.0,2.0,1.0,8.0,4.0,1.0,1.0,2.0,3.0,1.0,2.0,3.0,1.0,4.0,4.0,6.0,2.0,3.0,1.0,1.0,3.0,2.0,2.0,4.0,2.0,5.0,5.0,3.0,5.0,2.0,1.0,3.0,3.0,1.0,1.0,5.0,2.0,2.0,4.0,3.0,3.0,5.0,4.0,3.0,5.0,3.0,4.0,3.0,3.0,3.0,2.0,2.0,1.0,1.0,2.0,2.0,1.0,4.0,2.0,3.0,6.0,2.0,4.0,3.0,2.0,1.0,3.0,2.0,3.0,8.0,3.0,2.0,2.0,3.0,1.0,5.0,1.0,4.0,3.0,5.0,3.0,2.0,2.0,2.0,3.0,2.0,3.0,5.0,5.0,2.0,1.0,5.0,1.0,2.0,1.0,3.0,2.0,5.0,5.0,2.0,1.0,1.0,2.0,3.0,6.0]) into a BsonValue. org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 data type has no matching BsonValue.
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.lambda$createObjectToBsonValue$be3154de$1(RowToBsonDocumentConverter.java:140)
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.lambda$dataTypeToBsonElement$300467b5$1(RowToBsonDocumentConverter.java:153)
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.rowToBsonDocument(RowToBsonDocumentConverter.java:270)
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.fromRow(RowToBsonDocumentConverter.java:116)
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.fromRow(RowToBsonDocumentConverter.java:105)
	at com.mongodb.spark.sql.connector.write.MongoDataWriter.write(MongoDataWriter.java:91)
	at com.mongodb.spark.sql.connector.write.MongoDataWriter.write(MongoDataWriter.java:44)
	at org.apache.spark.sql.connector.write.DataWriter.writeAll(DataWriter.java:108)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:587)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:483)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1323)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:535)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:466)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:584)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:427)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.lang.Thread.run(Thread.java:1583)