In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell"

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
import uuid
import random
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import requests
import torch

ModuleNotFoundError: No module named 'findspark'

In [None]:
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("nomic-ai/nomic-embed-text-v1", trust_remote_code=True)


In [None]:
spark = SparkSession \
    .builder \
    .appName('RealtimeKafkaML') \
    .getOrCreate()

In [None]:
df_raw = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', "broker:29092") \
  .option("startingOffsets", "latest") \
  .option('subscribe', "datipipe") \
  .load()

In [None]:
df_json = df_raw.selectExpr('CAST(value AS STRING) as json')

In [None]:
article_schema = StructType([
        StructField("url", StringType(), True),
        StructField("publishedAt", StringType(), True),
        StructField("description", StringType(), True),
        StructField("source", StructType([
            StructField("name", StringType(), True),
            StructField("id", StringType(), True)
        ]), True),
        StructField("title", StringType(), True),
        StructField("urlToImage", StringType(), True),
        StructField("content", StringType(), True),
        StructField("author", StringType(), True)
    ])

    # Definisci lo schema per l'intero JSON
schema = StructType([
    StructField("@timestamp", StringType(), True),
    StructField("articles", article_schema, True),
    StructField("@version", StringType(), True),
    StructField("status", StringType(), True),
    StructField("totalResults", StringType(), True)
    ])

In [None]:
class CustomEmbeddingFunction:
    def __init__(self, ):
        self.model = model

    def __call__(self, input):
        if isinstance(input, list):
            return [self.generate_embeddings(text) for text in input]
        else:
            return [self.generate_embeddings(input)]

    def generate_embeddings(self, text):
        if text:
            embeddings = self.model.encode([text], convert_to_tensor=False)
            return embeddings.tolist()[0]
        else:
            return []


Column<'json'>

In [None]:
import chromadb
# Creazione dell'istanza della classe di funzione di embedding
embedding_function = CustomEmbeddingFunction()

# Utilizzo dell'istanza per aggiungere testi
client = chromadb.PersistentClient()
collection = client.get_or_create_collection(name="test", embedding_function=embedding_function)


In [None]:

# Preparazione dei dati di testo
sentences = ["Who is Laurens van der Maaten?", "What is machine learning?"]

# Aggiunta dei testi e degli embeddings calcolati alla collezione
collection.add(documents=sentences, ids = ["id4","id5"])

In [None]:
df_json.select(from_json(df_json.json, schema).alias('rowdata')) \
  .select('rowdata.articles.description') \
  .writeStream \
  .trigger(once=True) \
  .format("console") \
  .start() \
  .awaitTermination()

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

# Definisci la UDF con il tipo di ritorno corretto
@udf(returnType=ArrayType(FloatType()))
def get_embeddings(text):
    embeddings = model.encode([text])  # Assicurati che il testo non sia vuoto
    return embeddings.tolist()[0]

# Applica l'UDF al DataFrame
df_embeddings = df_json.withColumn("embeddings", get_embeddings(col("json")))
    

In [None]:
import chromadb
from dotenv import load_dotenv
import os

load_dotenv('.env.local')

storage_path = os.getenv('STORAGE_PATH')
if storage_path is None:
    raise ValueError('STORAGE_PATH environment variable is not set')

client = chromadb.PersistentClient(path=storage_path)

collection = client.get_or_create_collection(name="test")

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma

persist_directory = "./chroma/"

In [None]:
text_splitter = RecursiveCharacterTextSplitter(
                                                chunk_size=1000,
                                                chunk_overlap=200
                                                )
splits = text_splitter.split_documents(df_json)

for s in splitts:
    if(len(s.page_content) <100)
        splits.remove(s)

In [None]:
from langchain_community.embeddings import OllamaEmbeddings
embeddings = OllamaEmbeddings(model = "nomic-embed-text")

In [None]:
vectorstore = Chroma.from_documents(documents=splits,
                                    embedding=embeddings, 
                                    persist_directory=persist_directory)


In [None]:
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 6})

In [None]:
retrieved_docs = retriever.invoke("What happend in Gaza?")

print(retrieved_docs[0].page_content)

In [None]:
def clean_string(text):
    """
    Rimuove o sostituisce caratteri problematici in una stringa.
    """
    if isinstance(text, str):
        text = text.replace('"', '')
        text = text.replace("'", "")
        text = text.replace('\n', ' ').replace('\r', ' ')
        text = re.sub(r'[^\x00-\x7F]+', ' ', text)
        return text
    else:
        return ""


In [None]:
def get_roberta_embeddings(text):
    """
    Estrae gli embeddings dal testo usando RoBERTa.
    """
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    with torch.no_grad():
        outputs = model(**inputs)
    # Usa gli embeddings del token [CLS] (il primo token)
    embeddings = outputs.last_hidden_state[:, 0, :].numpy()
    return embeddings


In [None]:
embeddings = get_roberta_embeddings(description)
        # Salva gli embeddings in un file
collection2.add(
            embeddings = embeddings,
            description = description, 
        )      

        

In [32]:
df_json.select(
    from_json(df_json.json, schema).alias('rowdata'),
    from_json(roberta_udf(df_json.json), schema_output).alias('response')
).select(
    'rowdata.articles.description', 
    'response.embeddings'
).writeStream \
  .trigger(once=True) \
  .format("console") \
  .start() \
  .awaitTermination()


StreamingQueryException: [STREAM_FAILED] Query [id = 1c633fd9-e354-472a-ae15-7924d2445658, runId = a2eda904-57cd-483e-98cb-3fda0d905e86] terminated with exception: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 5) (45fd94edc999 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_152/1599525886.py", line 8, in <lambda>
  File "/tmp/ipykernel_152/1599525886.py", line 5, in apply_sentiment_analysis
  File "/opt/conda/lib/python3.11/site-packages/requests/api.py", line 115, in post
    return request("post", url, data=data, json=json, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/requests/api.py", line 59, in request
    return session.request(method=method, url=url, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/requests/sessions.py", line 589, in request
    resp = self.send(prep, **send_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/requests/sessions.py", line 703, in send
    r = adapter.send(request, **kwargs)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/requests/adapters.py", line 519, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=9000): Max retries exceeded with url: /predict (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fa018a2bc90>: Failed to establish a new connection: [Errno 111] Connection refused'))

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:

In [None]:
df_json.select(
    from_json(df_json.json, schema).alias('rowdata'),
    from_json(roberta_udf(df_json.json), schema_output).alias('response')
).select(
    'rowdata.articles.description', 
    'response.embeddings'
).writeStream \
  .trigger(once=True) \
  .format("json") \
  .option("path", "/home/jovyan/")\
  .option("checkpointLocation", "/home/jovyan/work/") \
  .start() \
  .awaitTermination()