In [0]:
spark.conf.set("spark.mongodb.input.uri", "mongodb://root:root@104.197.171.58:27017")
spark.conf.set("spark.mongodb.output.uri", "mongodb://root:root@104.197.171.58:27017")

In [0]:
from pyspark.sql import types as T, functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

In [0]:
stream_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "104.197.171.58:9092")
    .option("subscribe","raw_data")
    .option("startingOffsets","latest")
    .option("failOnDataLoss","false")
    .load()
)

In [0]:
SCHEMA = T.StructType([
    T.StructField('data', T.ArrayType(T.StructType([
        T.StructField('c', T.StringType(), True),
        T.StructField('p', T.FloatType(), True),
        T.StructField('s', T.StringType(), True),
        T.StructField('t', T.LongType(), True),
        T.StructField('v', T.FloatType(), True)
    ]))),
    T.StructField('type', T.StringType(), True)
])

In [0]:
stream_df = stream_df.select(
    F.from_json(
        F.col("value").cast(T.StringType()),
        SCHEMA
    ).alias("raw")
)

In [0]:
parsed_stream = stream_df\
    .selectExpr("explode (raw.data) as data")\
    .selectExpr(
        "data.c as TradeConditions", 
        "data.p as LastPriceUSD", 
        "data.s as Symbol", 
        "data.t as UNIXTimestamp", 
        "data.v as Volume"
    )

In [0]:
parsed_stream = parsed_stream.withColumn(
    "Timestamp", 
    F.current_timestamp()
)

### Predictions

In [0]:
def use_batches(batch, _):
    batch.show()
    # Creando el assembler de características
    assembler = VectorAssembler(
        inputCols=["UNIXTimestamp"],
        outputCol="features"
    )
    # Definiendo modelo de ML
    lr_model = LinearRegression(
        featuresCol="features",
        labelCol="LastPriceUSD",
        predictionCol="prediction"
    )
    # Creando Pipeline para definir etapas
    pipeline = Pipeline(stages=[assembler, lr_model])

    # Entrenar el modelo
    model = pipeline.fit(batch)

    # Predecir
    predictions = model.transform(batch)
    predictions = predictions.drop("features")
    
    predictions.show()

    predictions.write\
        .format("com.mongodb.spark.sql.DefaultSource")\
        .mode("append")\
        .option("uri", "mongodb://root:root@104.197.171.58")\
        .option("database", "proyecto3DB")\
        .option("collection", "procesed_data")\
        .save()



In [0]:
# send to kafka topic
parsed_stream\
    .writeStream\
    .foreachBatch(use_batches)\
    .start()\
    .awaitTermination()
     

+---------------+------------+---------------+-------------+-------+--------------------+
|TradeConditions|LastPriceUSD|         Symbol|UNIXTimestamp| Volume|           Timestamp|
+---------------+------------+---------------+-------------+-------+--------------------+
|           NULL|    37781.77|BINANCE:BTCUSDT|1700937063952|0.06887|2023-11-25 18:31:...|
+---------------+------------+---------------+-------------+-------+--------------------+

+---------------+------------+---------------+-------------+-------+--------------------+--------------+
|TradeConditions|LastPriceUSD|         Symbol|UNIXTimestamp| Volume|           Timestamp|    prediction|
+---------------+------------+---------------+-------------+-------+--------------------+--------------+
|           NULL|    37781.77|BINANCE:BTCUSDT|1700937063952|0.06887|2023-11-25 18:31:...|37781.76953125|
+---------------+------------+---------------+-------------+-------+--------------------+--------------+

+---------------+------

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/databricks/spark/python/pyspark/sql/utils.py", line 117, in call
    raise e
  File "/databricks/spark/python/pyspark/sql/utils.py", line 114, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/root/.ipykernel/1720/command-3185636810591886-1119477644", line 18, in use_batches
    model = pipeline.fit(batch)
  File "/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py", line 30, in patched_method
    result = original_method(self, *args, **kwargs)
  File "/databricks/spark/python/pyspark/ml/base.py", line 205, in fit
    return self._fit(dataset)
  File "/databricks/spark/python/pyspark/ml/pipeline.py", line 134, in _fit
    model 

[0;31m---------------------------------------------------------------------------[0m
[0;31mStreamingQueryException[0m                   Traceback (most recent call last)
File [0;32m<command-3185636810591887>, line 6[0m
[1;32m      1[0m [38;5;66;03m# send to kafka topic[39;00m
[1;32m      2[0m [43mparsed_stream[49m[43m\[49m
[1;32m      3[0m [43m    [49m[38;5;241;43m.[39;49m[43mwriteStream[49m[43m\[49m
[1;32m      4[0m [43m    [49m[38;5;241;43m.[39;49m[43mforeachBatch[49m[43m([49m[43muse_batches[49m[43m)[49m[43m\[49m
[1;32m      5[0m [43m    [49m[38;5;241;43m.[39;49m[43mstart[49m[43m([49m[43m)[49m[43m\[49m
[0;32m----> 6[0m [43m    [49m[38;5;241;43m.[39;49m[43mawaitTermination[49m[43m([49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/sql/streaming/query.py:206[0m, in [0;36mStreamingQuery.awaitTermination[0;34m(self, timeout)[0m
[1;32m    204[0m     [38;5;28;01mreturn[39;00m [38;5;28mself[39m[38;5;241m