In [1]:
from sparknlp.annotator import *
from sparknlp.base import *
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json

spark = SparkSession.builder \
      .appName("Spark NLP") \
      .master("local[*]") \
      .config("spark.driver.memory", "16G") \
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
      .config("spark.kryoserializer.buffer.max", "2000M") \
      .config("spark.driver.maxResultSize", "0") \
      .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.3.3") \
      .config("spark.jars", "/shareddata/lab09/kafka-clients-3.5.0.jar,/shareddata/lab09/spark-sql-kafka-0-10_2.12-3.5.0.jar,/shareddata/lab09/spark-token-provider-kafka-0-10_2.12-3.5.0.jar,/shareddata/lab09/commons-pool2-2.12.0.jar") \
      .getOrCreate()

:: loading settings :: url = jar:file:/opt/module/spark-3.5.0-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c082316f-a886-4ad7-9fc5-aeb167e97542;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;5.3.3 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-s3;1.12.500 in central
	found com.amazonaws#aws-java-sdk-kms;1.12.500 in central
	found com.amazonaws#aws-java-sdk-core;1.12.500 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found software.amazon.ion#ion-java;1.0.2 in central
	found joda-time#joda-time;2.8.1 in central
	found com.amazonaws#jmespath-java;1.12.500 in central
	found com.g

In [2]:
text = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:64046").option("subscribe", "qa").option("startingOffsets", "earliest").load()
text.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [3]:
query = text.selectExpr("CAST(value AS STRING) as value").select(from_json("value",schema="Input STRING, Output STRING").alias("data"))
activityQuery = query.writeStream.outputMode("update").format("memory").queryName("Show").trigger(processingTime="5 seconds").start()

24/06/07 14:20:04 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-cefefe35-419d-4c91-a656-e34338099a42. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/06/07 14:20:04 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [5]:
spark.sql("select data.* from Show").show(5,truncate = False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------+
|Input                                                                                                                                         

In [7]:
MODEL_NAME = "/data/lab/Projects/Project2/best_model"
EXPORT_PATH = f"""./best_model/onnx_models/flan-t5-small"""
! optimum-cli export onnx --task text2text-generation-with-past --model {MODEL_NAME} {EXPORT_PATH}
! mkdir -p {EXPORT_PATH}/assets
! mv -t {EXPORT_PATH}/assets {EXPORT_PATH}/spiece.model

  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(
Framework not specified. Using pt to export the model.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Using the export variant default. Available variants are:
    - default: The default ONNX variant.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.

***** Exporting submodel 1/3: T5Stack *****
Using framework PyTorch: 2.3.0+cu121
Overriding 1 configuration item(s)
	- use_cache -> False

***** Exporting submodel 2/3: T5ForConditionalGeneration *****
Using framework PyTorch: 2.3.0+cu121
Overriding 1 configuration item(s)
	- use_cache -> True


In [8]:
T5 = T5Transformer.loadSavedModel(EXPORT_PATH, spark)\
   .setUseCache(True) \
   .setTask("qa:") \
   .setMaxOutputLength(200)

T5.write().overwrite().save(f"{MODEL_NAME}_spark_nlp")
!rm -rf {EXPORT_PATH}

Using CPUs
Using CPUs


In [9]:
documentAssembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

T5 = T5Transformer.load(f"{MODEL_NAME}_spark_nlp") \
  .setInputCols(["document"]) \
  .setOutputCol("answers")
    
pipeline = Pipeline(stages=[documentAssembler, T5])

Using CPUs
Using CPUs


In [11]:
from kafka import KafkaConsumer
import json 

broker = 'localhost:64046'
consumer = KafkaConsumer('qa', 
        bootstrap_servers=broker, 
        value_deserializer=lambda m: json.loads(m.decode('utf-8')), 
        consumer_timeout_ms=100000,
        auto_offset_reset='latest', enable_auto_commit=False
    )

for message in consumer:
    text = message.value['Input']
    text_data = spark.createDataFrame([[text]]).toDF("text")
    result = pipeline.fit(text_data).transform(text_data)
    answer = result.select("answers.result").collect()[0][0][0]
    print(f"Based on: {text}\nAnswer: {answer}")

Based on: question: What is one of the 10 megaregions in the United States? context: The 8- and 10-county definitions are not used for the greater Southern California Megaregion, one of the 11 megaregions of the United States. The megaregion's area is more expansive, extending east into Las Vegas, Nevada, and south across the Mexican border into Tijuana.
Answer: Southern California Megaregion


                                                                                

Based on: question: Where does the 8 county megaregion extend from? context: The 8- and 10-county definitions are not used for the greater Southern California Megaregion, one of the 11 megaregions of the United States. The megaregion's area is more expansive, extending east into Las Vegas, Nevada, and south across the Mexican border into Tijuana.
Answer: east into Las Vegas, Nevada, and south across the Mexican border into Tijuana


                                                                                

Based on: question: What is Las Vegas one of in the United States? context: The 8- and 10-county definitions are not used for the greater Southern California Megaregion, one of the 11 megaregions of the United States. The megaregion's area is more expansive, extending east into Las Vegas, Nevada, and south across the Mexican border into Tijuana.
Answer: the greater Southern California Megaregion


                                                                                

Based on: question: Which coastline does Southern California touch? context: Southern California includes the heavily built-up urban area stretching along the Pacific coast from Ventura, through the Greater Los Angeles Area and the Inland Empire, and down to Greater San Diego. Southern California's population encompasses seven metropolitan areas, or MSAs: the Los Angeles metropolitan area, consisting of Los Angeles and Orange counties; the Inland Empire, consisting of Riverside and San Bernardino counties; the San Diego metropolitan area; the Oxnard–Thousand Oaks–Ventura metropolitan area; the Santa Barbara metro area; the San Luis Obispo metropolitan area; and the El Centro area. Out of these, three are heavy populated areas: the Los Angeles area with over 12 million inhabitants, the Riverside-San Bernardino area with over four million inhabitants, and the San Diego area with over 3 million inhabitants. For CSA metropolitan purposes, the five counties of Los Angeles, Orange, Riverside

                                                                                

Based on: question: How many metropolitan areas does Southern California's population encompass? context: Southern California includes the heavily built-up urban area stretching along the Pacific coast from Ventura, through the Greater Los Angeles Area and the Inland Empire, and down to Greater San Diego. Southern California's population encompasses seven metropolitan areas, or MSAs: the Los Angeles metropolitan area, consisting of Los Angeles and Orange counties; the Inland Empire, consisting of Riverside and San Bernardino counties; the San Diego metropolitan area; the Oxnard–Thousand Oaks–Ventura metropolitan area; the Santa Barbara metro area; the San Luis Obispo metropolitan area; and the El Centro area. Out of these, three are heavy populated areas: the Los Angeles area with over 12 million inhabitants, the Riverside-San Bernardino area with over four million inhabitants, and the San Diego area with over 3 million inhabitants. For CSA metropolitan purposes, the five counties of L

                                                                                

Based on: question: How many inhabitants does the Los Angeles area contain? context: Southern California includes the heavily built-up urban area stretching along the Pacific coast from Ventura, through the Greater Los Angeles Area and the Inland Empire, and down to Greater San Diego. Southern California's population encompasses seven metropolitan areas, or MSAs: the Los Angeles metropolitan area, consisting of Los Angeles and Orange counties; the Inland Empire, consisting of Riverside and San Bernardino counties; the San Diego metropolitan area; the Oxnard–Thousand Oaks–Ventura metropolitan area; the Santa Barbara metro area; the San Luis Obispo metropolitan area; and the El Centro area. Out of these, three are heavy populated areas: the Los Angeles area with over 12 million inhabitants, the Riverside-San Bernardino area with over four million inhabitants, and the San Diego area with over 3 million inhabitants. For CSA metropolitan purposes, the five counties of Los Angeles, Orange, R

                                                                                

Based on: question: Which of the three heavily populated areas has the least number of inhabitants? context: Southern California includes the heavily built-up urban area stretching along the Pacific coast from Ventura, through the Greater Los Angeles Area and the Inland Empire, and down to Greater San Diego. Southern California's population encompasses seven metropolitan areas, or MSAs: the Los Angeles metropolitan area, consisting of Los Angeles and Orange counties; the Inland Empire, consisting of Riverside and San Bernardino counties; the San Diego metropolitan area; the Oxnard–Thousand Oaks–Ventura metropolitan area; the Santa Barbara metro area; the San Luis Obispo metropolitan area; and the El Centro area. Out of these, three are heavy populated areas: the Los Angeles area with over 12 million inhabitants, the Riverside-San Bernardino area with over four million inhabitants, and the San Diego area with over 3 million inhabitants. For CSA metropolitan purposes, the five counties o

                                                                                

Based on: question: How many people does the Greater Los Angeles Area have? context: Southern California includes the heavily built-up urban area stretching along the Pacific coast from Ventura, through the Greater Los Angeles Area and the Inland Empire, and down to Greater San Diego. Southern California's population encompasses seven metropolitan areas, or MSAs: the Los Angeles metropolitan area, consisting of Los Angeles and Orange counties; the Inland Empire, consisting of Riverside and San Bernardino counties; the San Diego metropolitan area; the Oxnard–Thousand Oaks–Ventura metropolitan area; the Santa Barbara metro area; the San Luis Obispo metropolitan area; and the El Centro area. Out of these, three are heavy populated areas: the Los Angeles area with over 12 million inhabitants, the Riverside-San Bernardino area with over four million inhabitants, and the San Diego area with over 3 million inhabitants. For CSA metropolitan purposes, the five counties of Los Angeles, Orange, R

                                                                                

Based on: question: What percent of California's 22 million people live in southern California? context: Southern California includes the heavily built-up urban area stretching along the Pacific coast from Ventura, through the Greater Los Angeles Area and the Inland Empire, and down to Greater San Diego. Southern California's population encompasses seven metropolitan areas, or MSAs: the Los Angeles metropolitan area, consisting of Los Angeles and Orange counties; the Inland Empire, consisting of Riverside and San Bernardino counties; the San Diego metropolitan area; the Oxnard–Thousand Oaks–Ventura metropolitan area; the Santa Barbara metro area; the San Luis Obispo metropolitan area; and the El Centro area. Out of these, three are heavy populated areas: the Los Angeles area with over 12 million inhabitants, the Riverside-San Bernardino area with over four million inhabitants, and the San Diego area with over 3 million inhabitants. For CSA metropolitan purposes, the five counties of Lo

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/root/anaconda3/envs/sp/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/root/anaconda3/envs/sp/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/root/anaconda3/envs/sp/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
                                                                                

KeyboardInterrupt: 

Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)
