# Project2
https://huggingface.co/docs/transformers/v4.29.1/en/tasks/question_answering#question-answering

## Kafka

In [1]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import pyspark.sql.functions as F
from pyspark import SparkConf
from pyspark.sql.types import StringType, StructType, StructField, IntegerType,FloatType, TimestampType
from pyspark.sql.functions import window
from pyspark.sql.functions import current_timestamp
from pyspark.streaming import StreamingContext

import os
os.environ['PYSPARK_PYTHON'] = "/root/miniconda3/envs/sp/bin/python"

spark = SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1") \
    .getOrCreate()

# spark = SparkSession.builder\
#   .config(
#     "spark.jars",
#     "/data/jupyter-data/lab08/kafka-clients-3.4.0.jar,/data/jupyter-data/lab08/spark-sql-kafka-0-10_2.12-3.3.1.jar,\
#     /data/jupyter-data/lab08/spark-token-provider-kafka-0-10_2.12-3.3.1.jar, \
#     /data/jupyter-data/lab08/commons-pool2-2.11.1.jar") \
#   .config("spark.executorEnv.PYSPARK_PYTHON","/root/miniconda3/envs/sp/bin/python") \
#   .config("spark.executor.memory", "2g") \
#   .config("spark.driver.memory", "2g") \
#   .appName("Pro2").getOrCreate()


:: loading settings :: url = jar:file:/root/miniconda3/envs/sp/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-92821ed3-2d4f-4e53-a005-748ea15ecf8b;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.1 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 245ms :: artifacts dl 7ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central in [default]
	

23/05/16 02:13:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from transformers import pipeline
question_answerer = pipeline("question-answering", model='distilbert-base-cased-distilled-squad')

context = r"""
Extractive Question Answering is the task of extracting an answer from a text given a question. An example     of a
question answering dataset is the SQuAD dataset, which is entirely based on that task. If you would like to fine-tune
a model on a SQuAD task, you may leverage the examples/pytorch/question-answering/run_squad.py script.
"""

result = question_answerer(question="What is a good example of a question answering dataset?",     context=context)
print(
f"Answer: '{result['answer']}', score: {round(result['score'], 4)}, start: {result['start']}, end: {result['end']}"
)

Answer: 'SQuAD dataset', score: 0.5152, start: 151, end: 164


In [4]:
context = r"""At the 57th Annual Grammy Awards in February 2015, Beyoncé was nominated for six awards, 
ultimately winning three: Best R&B Performance and Best R&B Song for "Drunk in Love", and Best Surround Sound
 Album for Beyoncé. She was nominated for Album of the Year but the award was won by Beck for his Morning Phase album.
 In August, the cover of the September issue of Vogue magazine was unveiled online, Beyoncé as the cover star,
   becoming the first African-American artist and third African-American woman in general to cover the September issue. 
   She headlined the 2015 Made in America festival in early September and also the Global Citizen Festival later that month.
 Beyoncé made an uncredited featured appearance on the track "Hymn for the Weekend" by British rock band Coldplay, 
 on their seventh studio album A Head Full of Dreams (2015), which saw release in December. On January 7, 
 2016, Pepsi announced Beyoncé would perform alongside Coldplay at Super Bowl 50 in February. Knowles has previously 
 performed at four Super Bowl shows throughout her career, serving as the main headliner of the 47th Super Bowl halftime show in 2013."""

result = question_answerer(question="Beyoncé lost the Album of the Year award to which entertainer?",     context=context)
print(
f"Answer: '{result['answer']}', score: {round(result['score'], 4)}, start: {result['start']}, end: {result['end']}"
)


Answer: 'Beck', score: 0.9964, start: 285, end: 289


In [5]:
df1 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("subscribe", "project2") \
  .option("startingOffsets", "earliest") \
  .load()


schema = """context STRING, question STRING, answers_text STRING, answers_start STRING"""

# 将value列反序列化为JSON，并将JSON解析为DataFrame 

df2 = df1.selectExpr("CAST(value AS STRING)") \
    .select(F.from_json(F.col("value"), schema).alias("data")) \
    .select("data.*")

def my_answer(question = None, context = None):
    result = question_answerer(question=question, context=context)
    return f"Answer: '{result['answer']}', score: {round(result['score'], 4)}, start: {result['start']}, end: {result['end']}"
my_answer = F.udf(my_answer, StringType())


df3 = df2.withColumn("my_answer",my_answer(F.col("question"),F.col("context")))

query = df3.writeStream \
           .queryName("test2") \
           .outputMode("append") \
           .format("memory") \
           .start()


[Stage 2:>                                                          (0 + 1) / 1]

                                                                                

23/05/16 02:05:13 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-3f157232-4afe-4d31-9b87-6da654f111ac. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/05/16 02:05:13 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


IllegalArgumentException: Cannot start query with name test2 as a query with that name is already active in this SparkSession

23/05/16 02:05:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:05:21 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

In [6]:
spark.sql("select my_answer from test2 limit 20").collect()

[Row(my_answer="Answer: 'late 1990s', score: 0.4887, start: 276, end: 286"),
 Row(my_answer="Answer: 'singing and dancing', score: 0.97, start: 207, end: 226"),
 Row(my_answer="Answer: '2003', score: 0.9826, start: 526, end: 530"),
 Row(my_answer="Answer: 'Houston, Texas', score: 0.5976, start: 166, end: 180"),
 Row(my_answer="Answer: '1990s', score: 0.7309, start: 281, end: 286"),
 Row(my_answer="Answer: 'Destiny's Child', score: 0.9812, start: 320, end: 335"),
 Row(my_answer="Answer: 'Dangerously in Love', score: 0.9495, start: 505, end: 524"),
 Row(my_answer="Answer: 'Mathew Knowles', score: 0.6734, start: 360, end: 374"),
 Row(my_answer="Answer: 'late 1990s', score: 0.56, start: 276, end: 286"),
 Row(my_answer="Answer: 'lead singer', score: 0.9842, start: 290, end: 301"),
 Row(my_answer="Answer: 'Dangerously in Love', score: 0.9655, start: 505, end: 524"),
 Row(my_answer="Answer: '2003', score: 0.982, start: 526, end: 530"),
 Row(my_answer="Answer: 'five', score: 0.9229, start: 590

23/05/16 02:05:26 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:05:31 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:05:36 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:05:41 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:05:46 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:05:51 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:05:56 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:06:01 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:06:06 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:06:11 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:06:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:06:21 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:06:26 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:06:31 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:06:36 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:06:41 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:06:46 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:06:51 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:06:56 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:07:01 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:07:06 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:07:11 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:07:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:07:21 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:07:26 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:07:31 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:07:36 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:07:41 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:07:46 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:07:51 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:07:56 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:08:01 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

23/05/16 02:08:06 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

In [22]:
# import json
# from pyspark.sql.functions import from_json
# from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# schema = StructType([
#     StructField("context", StringType(), True),
#     StructField("question", StringType(), True),
#     StructField("answers_text", StringType(), True),
#     StructField("answers_start", IntegerType(), True)
# ])

# # 从Kafka读取数据并转换为DataFrame
# df = spark \
#     .readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "localhost:9092") \
#     .option("subscribe", "project2") \
#     .option("startingOffsets", "earliest") \
#     .load() \
#     .selectExpr("CAST(value AS STRING)") \
#     .select(from_json("value", schema).alias("data")) \
#     .select("data.*")

# # 查看数据
# query = df \
#     .writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .start() \
#     .awaitTermination()


                                                                                

23/05/14 08:01:39 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
23/05/14 08:01:43 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
23/05/14 08:01:43 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
23/05/14 08:01:43 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


                                                                                

### qa

### Example
{'context': 'At the 57th Annual Grammy Awards in February 2015, Beyoncé was nominated for six awards, ultimately winning three: Best R&B Performance and Best R&B Song for "Drunk in Love", and Best Surround Sound Album for Beyoncé. She was nominated for Album of the Year but the award was won by Beck for his Morning Phase album. In August, the cover of the September issue of Vogue magazine was unveiled online, Beyoncé as the cover star, becoming the first African-American artist and third African-American woman in general to cover the September issue. She headlined the 2015 Made in America festival in early September and also the Global Citizen Festival later that month. Beyoncé made an uncredited featured appearance on the track "Hymn for the Weekend" by British rock band Coldplay, on their seventh studio album A Head Full of Dreams (2015), which saw release in December. On January 7, 2016, Pepsi announced Beyoncé would perform alongside Coldplay at Super Bowl 50 in February. Knowles has previously performed at four Super Bowl shows throughout her career, serving as the main headliner of the 47th Super Bowl halftime show in 2013.', 'question': 'Beyoncé lost the Album of the Year award to which entertainer?', 'answers': {'text': ['Beck'], 'answer_start': [283]}}

## Spark NLP

In [11]:
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp

spark = sparknlp.start()

Document_Assembler = MultiDocumentAssembler()\
     .setInputCols(["question", "context"])\
     .setOutputCols(["document_question", "document_context"])

Question_Answering = RoBertaForQuestionAnswering.pretrained("roberta_qa_deepset_base_squad2","en")\
     .setInputCols(["document_question", "document_context"])\
     .setOutputCol("answer")\
     .setCaseSensitive(True)
    
pipeline = Pipeline(stages=[Document_Assembler, Question_Answering])



23/05/16 06:49:55 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
roberta_qa_deepset_base_squad2 download started this may take some time.
Approximate size to download 442.6 MB
[OK!]


                                                                                

In [14]:
data = spark.createDataFrame([["What's my name?","My name is Clara and I live in Berkeley."],["What's my age?","My name is Clara and I live in Berkeley."]]).toDF("question", "context")

result = pipeline.fit(data).transform(data)

result.select("answer.result").show(truncate=False)
# data.show()

+-------+
|result |
+-------+
|[Clara]|
|[]     |
+-------+



                                                                                

In [30]:
# show the schema of result
result.printSchema()

root
 |-- question: string (nullable = true)
 |-- context: string (nullable = true)
 |-- document_question: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- document_context: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string

In [1]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import pyspark.sql.functions as F
from pyspark import SparkConf
from pyspark.sql.types import StringType, StructType, StructField, IntegerType,FloatType, TimestampType
from pyspark.sql.functions import window
from pyspark.sql.functions import current_timestamp
from pyspark.streaming import StreamingContext
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp

# spark = SparkSession.builder \
#         .appName("MyApp") \
#         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1") \
#         .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.3.0") \
#         .getOrCreate()
# spark.sparkContext.setLogLevel("ERROR")
spark = SparkSession.builder\
  .master("local[*]")\
  .config(
    "spark.jars", 
    "/data/jupyter-data/labs/lab08/kafka-clients-3.4.0.jar,/data/jupyter-data/labs/lab08/spark-sql-kafka-0-10_2.12-3.3.1.jar, \
    /data/jupyter-data/labs/lab08/spark-token-provider-kafka-0-10_2.12-3.3.1.jar, \
    /data/jupyter-data/labs/lab08/commons-pool2-2.11.1.jar") \
  .config("spark.executorEnv.PYSPARK_PYTHON","/root/miniconda3/envs/sp/bin/python") \
  .config("spark.executor.memory", "2g") \
  .config("spark.driver.memory", "2g") \
  .config("spark.driver.maxResultSize", "0")\
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
  .config("spark.kryoserializer.buffer.max", "2000m")\
  .config("spark.jsl.settings.pretrained.cache_folder", "sample_data/pretrained")\
  .config("spark.jsl.settings.storage.cluster_tmp_dir", "sample_dataorage")\
  .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.4.1")\
  .getOrCreate()

Document_Assembler = MultiDocumentAssembler()\
     .setInputCols(["question", "context"])\
     .setOutputCols(["document_question", "document_context"])

Question_Answering = RoBertaForQuestionAnswering.pretrained("roberta_qa_deepset_base_squad2","en")\
     .setInputCols(["document_question", "document_context"])\
     .setOutputCol("answer")\
     .setCaseSensitive(True)
    
pipeline = Pipeline(stages=[Document_Assembler, Question_Answering])

nlp_df1 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("subscribe", "project2") \
  .option("startingOffsets", "earliest") \
  .load()

schema = """context STRING, question STRING, answers_text STRING, answers_start STRING"""

# 将value列反序列化为JSON，并将JSON解析为DataFrame 

nlp_df2 = nlp_df1.selectExpr("CAST(value AS STRING)") \
    .select(F.from_json(F.col("value"), schema).alias("data")) \
    .select("data.question", "data.context")

result = pipeline.fit(nlp_df2).transform(nlp_df2)#.select("answer.result")

nlp_df2.writeStream \
           .queryName("input") \
           .outputMode("append") \
           .format("memory") \
           .start()

result.writeStream \
            .queryName("output") \
            .outputMode("append") \
            .format("memory") \
            .start()

:: loading settings :: url = jar:file:/root/miniconda3/envs/sp/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6b6a5233-8602-47f0-9b30-89d3a4ca287f;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;4.4.1 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1.16.8 in central
	found com.google.cloud#google-cloud-storage;2.16.0 in central
	found com.google.guava#guava;31.1-jre in central
	found com.google.guava#failureaccess;1.0.1 

23/05/16 06:45:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


roberta_qa_deepset_base_squad2 download started this may take some time.
Approximate size to download 442.6 MB
[ | ]roberta_qa_deepset_base_squad2 download started this may take some time.
Approximate size to download 442.6 MB
[ / ]Download done! Loading the resource.
[ \ ]

2023-05-16 06:45:56.092791: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


[OK!]
23/05/16 06:46:02 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-36adac68-6c7e-464d-af02-18fd089abba7. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/05/16 06:46:02 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/05/16 06:46:02 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ab7c868c-4d1c-4ffa-a33b-03a7d4805add. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/05/16 06:46:02 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not s

<pyspark.sql.streaming.StreamingQuery at 0x7fd1bb345990>

In [18]:
spark.sql("select * from input limit 20").show(truncate=False)

+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [21]:
spark.sql("select question,answer.result from output limit 20").show(truncate=False)

+----------------------------------------------------------------------------------+---------------------+
|question                                                                          |result               |
+----------------------------------------------------------------------------------+---------------------+
|When did Beyonce start becoming popular?                                          |[late 1990 s]        |
|What areas did Beyonce compete in when she was growing up?                        |[singing and dancing]|
|When did Beyonce leave Destiny's Child and become a solo singer?                  |[2003]               |
|In what city and state did Beyonce  grow up?                                      |[Houston , Texas]    |
|In which decade did Beyonce become famous?                                        |[1990 s]             |
|In what R&B group was she the lead singer?                                        |[Destiny 's Child]   |
|What album made her a worldwide know