In [13]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# Q1

## (1)

In [14]:
filefold = '/shareddata/data/project2/Task1/squad_v2/squad_v2/'
traindata = pd.read_parquet(filefold+'train-00000-of-00001.parquet')
validdata = pd.read_parquet(filefold+'validation-00000-of-00001.parquet')

In [15]:
# 定义转换函数
def convert_to_text2text(df):
    df['Processed input'] = 'question: ' + df['question'] + ' context: ' + df['context']
    
    # 从answers列提取text字段
    df['Processed target'] = df['answers'].apply(lambda x: ' '.join(x['text']) if isinstance(x, dict) and 'text' in x else '')
    
    return df

# 转换原始训练集
train_df = convert_to_text2text(traindata)

# 将原始训练集分成新的训练集和验证集
train_df, valid_df = train_test_split(train_df, test_size=5000, random_state=42)

# 转换官方验证集作为测试集
test_df = convert_to_text2text(validdata)

In [16]:
# 将数据存储为Parquet格式
train_df.to_parquet('/data/HW/proj2/data/train_df.parquet', index=False)
valid_df.to_parquet('/data/HW/proj2/data/valid_df.parquet', index=False)
test_df.to_parquet('/data/HW/proj2/data/test_df.parquet', index=False)

In [17]:
train = pd.read_parquet('/data/HW/proj2/data/train_df.parquet')
train.head()

Unnamed: 0,id,title,context,question,answers,Processed input,Processed target
0,56f8ba5a9b226e1400dd0ebf,Gene,"In all organisms, two steps are required to re...",What is the process of producing a biologicall...,"{'answer_start': [428], 'text': ['gene express...",question: What is the process of producing a b...,gene expression
1,5a65bfbec2b11c001a425d30,Antibiotics,Exposure to antibiotics early in life is assoc...,What are some microbiota that can be used for ...,"{'answer_start': [], 'text': []}",question: What are some microbiota that can be...,
2,56dfb4d6231d4119001abc97,Pub,The history of pubs can be traced back to Roma...,What Roman businesses were analogous to modern...,"{'answer_start': [48], 'text': ['taverns']}",question: What Roman businesses were analogous...,taverns
3,5719f37810f8ca1400304eb7,Seattle,Seattle typically receives some snowfall on an...,How many times has snowfall been reported at m...,"{'answer_start': [270], 'text': ['once']}",question: How many times has snowfall been rep...,once
4,57098302ed30961900e8424e,Identity_(social_science),Kenneth Gergen formulated additional classific...,Who formulated the classifications of strategi...,"{'answer_start': [0], 'text': ['Kenneth Gergen']}",question: Who formulated the classifications o...,Kenneth Gergen


In [18]:
train["Processed input"].loc[0]

"question: What is the process of producing a biologically functional molecule of either RNA or protein called? context: In all organisms, two steps are required to read the information encoded in a gene's DNA and produce the protein it specifies. First, the gene's DNA is transcribed to messenger RNA (mRNA).:6.1 Second, that mRNA is translated to protein.:6.2 RNA-coding genes must still go through the first step, but are not translated into protein. The process of producing a biologically functional molecule of either RNA or protein is called gene expression, and the resulting molecule is called a gene product."

In [24]:
valid = pd.read_parquet('/data/HW/proj2/data/valid_df.parquet')
len(valid)

5000

## (2)

## (3)

In [1]:
from pyspark.sql import SparkSession
from sparknlp.base import DocumentAssembler, PipelineModel
from sparknlp.annotator import *
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession.builder \
    .appName("Spark NLP") \
    .master("local[*]") \
    .config("spark.driver.memory", "16G") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.3.3") \
    .config("spark.jars", "/data/HW/HW3/kafka_2.13-3.7.0/libs/kafka-clients-3.7.0.jar,\
                           /shareddata/lab09/spark-sql-kafka-0-10_2.12-3.5.0.jar,\
                           /shareddata/lab09/spark-token-provider-kafka-0-10_2.12-3.5.0.jar,\
                           /shareddata/lab09/commons-pool2-2.12.0.jar") \
    .getOrCreate()

kafka_topic = "qaaa"
kafka_config = {
    "kafka.bootstrap.servers": "localhost:9092",  
    "subscribe": kafka_topic,
    "startingOffsets": "earliest",
    "kafka.max.partition.fetch.bytes": 2097152,
    "kafka.max.poll.records": 500,
    "group.id": "midas-flan-group"
}

input_schema = StructType([
    StructField("context", StringType(), True),
    StructField("question", StringType(), True)
])

kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("truncate", "false") \
    .options(**kafka_config) \
    .load() \
    .select(from_json(col("value").cast("string"), input_schema).alias("data")) \
    .select("data.*")

# 将 context 和 question 列合并为一列 text  
data_df = kafka_df.selectExpr("""
    concat_ws(' ', concat('question: ', question), concat('context: ', context) ) as text
""")

# 打印 data_df 的 schema
data_df.printSchema()

# 将数据写入内存
data_output_query = data_df \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("data_memory") \
    .start()

# Transforms raw texts into `document` annotation 
document_assembler = (
    DocumentAssembler()
    .setInputCol("text")
    .setOutputCol("documents")
)

# The T5 model
t5 = (T5Transformer.load("/data/HW/proj2/best_model_spark_nlp") 
                    .setTask("question:")
                    .setInputCols(["documents"])  
                    .setMaxOutputLength(200)
                    .setOutputCol("t5")
)

# Define the Spark pipeline
pipeline = PipelineModel(stages=[document_assembler, t5])

# 从内存中读取数据并应用模型进行处理
result = pipeline.transform(spark.sql("SELECT * FROM data_memory"))

:: loading settings :: url = jar:file:/opt/module/spark-3.5.0-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f8664b38-6295-4f5d-86a7-fd168f055421;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;5.3.3 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-s3;1.12.500 in central
	found com.amazonaws#aws-java-sdk-kms;1.12.500 in central
	found com.amazonaws#aws-java-sdk-core;1.12.500 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found software.amazon.ion#ion-java;1.0.2 in central
	found joda-time#joda-time;2.8.1 in central
	found com.amazonaws#jmespath-java;1.12.500 in central
	found com.g

root
 |-- text: string (nullable = false)



24/05/22 13:20:50 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-752a6e36-64a6-4d93-a019-ed6fd1ef2ecf. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/05/22 13:20:50 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
[Stage 1:>                                                          (0 + 1) / 1]

Using CPUs


                                                                                

Using CPUs


In [2]:
result.select(["text", "t5.result"]).show(truncate=False)

[Stage 2:>                                                          (0 + 1) / 1]

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [3]:
result.select(["text", "t5.result"]).show()

[Stage 3:>                                                          (0 + 1) / 1]

+--------------------+--------------------+
|                text|              result|
+--------------------+--------------------+
|question: In what...|            [France]|
|question: When we...|[first half of th...|
|question: From wh...|[Denmark, Iceland...|
|question: Who was...|             [Rollo]|
|question: What ce...|      [10th century]|
|question: Who gav...|           [Normans]|
|question: What is...|          [Normandy]|
|question: Who did...|          [Normandy]|
|question: When di...|[first half of th...|
|question: Who was...|          [Roger II]|
|question: Who rul...|         [Richard I]|
|question: What re...|          [Catholic]|
|question: What ty...|[political, cultu...|
|question: Who was...|           [Normans]|
|question: Who ass...|           [Normans]|
|question: Who rul...|          [Roger II]|
|question: What pr...|[Principality of ...|
|question: What is...|   [Normans/Normanz]|
|question: When wa...|       [9th century]|
|question: What na...|          

                                                                                