In [3]:
import datetime
import time
import pymupdf as fitz  # PyMuPDF
import requests
import json
import numpy as np
import os
from transformers import BertTokenizer


## 读取各类型文档

### HTML

读取文件夹中所有的html文件

In [5]:
import os.path
import re
from bs4 import BeautifulSoup

# 定义需要匹配的关键词列表：
keywords = ["参见", "参考文献", "相关条目", "外部链接"]

def extract_text_from_html(html_path):
    pathDir = os.listdir(html_path)  #获取当前路径下的文件名，返回list
    temp = 1
    for s in pathDir:
        newDir = os.path.join(html_path, s)   #将文件名加入到当前文件路径后面

        if os.path.isfile(newDir):
            if os.path.splitext(newDir)[1]=='.html' or os.path.splitext(newDir)[1]=='.htm':
                bs = BeautifulSoup(open(newDir), 'html.parser')
                h1 = bs.find("div", "mw-content-ltr mw-parser-output")  #查找词条内容
                text = h1.get_text()   #获取文本内容
                #删除“关键词”之后的内容
                pattern = "|".join(map(re.escape, keywords))
                match = re.search(pattern, text)
                if match:
                    text = text[:match.start()]
                #保存text内容到文件
                output_file = os.path.join("html_to_txt", 'html_txt'+str(temp))
                with open(output_file+'.txt', 'w', encoding='utf-8') as f:
                    f.write(text+'\n')
            temp += 1
        else:
            extract_text_from_html(html_path)

In [6]:
html_path = 'html_files'
extract_text_from_html(html_path)
print('提取完成')
print("提取的html文件名列表：", os.listdir('html_to_txt'))

提取完成
提取的html文件名列表： ['html_txt20.txt', 'html_txt26.txt', 'html_txt11.txt', 'html_txt18.txt', 'html_txt22.txt', 'html_txt44.txt', 'html_txt21.txt', 'html_txt3.txt', 'html_txt1.txt', 'html_txt10.txt', 'html_txt36.txt', 'html_txt35.txt', 'html_txt7.txt', 'html_txt27.txt', 'html_txt30.txt', 'html_txt46.txt', 'html_txt5.txt', 'html_txt23.txt', 'html_txt34.txt', 'html_txt17.txt', 'html_txt38.txt', 'html_txt12.txt', 'html_txt2.txt', 'html_txt37.txt', 'html_txt32.txt', 'html_txt48.txt', 'html_txt31.txt', 'html_txt19.txt', 'html_txt43.txt', 'html_txt47.txt', 'html_txt24.txt', 'html_txt14.txt', 'html_txt15.txt', 'html_txt9.txt', 'html_txt49.txt', 'html_txt4.txt', 'html_txt28.txt', 'html_txt8.txt', 'html_txt40.txt', 'html_txt13.txt', 'html_txt29.txt', 'html_txt42.txt', 'html_txt16.txt', 'html_txt41.txt', 'html_txt45.txt', 'html_txt25.txt', 'html_txt39.txt', 'html_txt6.txt', 'html_txt33.txt', 'html_txt50.txt']


### PDF

In [1]:
import os
import pymupdf as fitz  # PyMuPDF

# 从PDF文件中提取文本函数
def extract_text_from_pdf(pdf_path):
    text = ""
    pdf_document = fitz.open(pdf_path)
    for page_num in range(len(pdf_document)):
        page = pdf_document.load_page(page_num)
        text += page.get_text()
    return text

## 文档分割和存储

In [2]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from transformers import BertTokenizer

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
def tokens(text: str) -> int:
    return len(tokenizer.encode(text))

def split_text(text, chunk_size, chunk_overlap):
    # 定义适用于中文的分割符，按照优先级从高到低排列
    separators = ["\n\n", "\n", "。", "！", "？", "；", "，", "、", "：", " ", "", "\.\s|\!\s|\?\s", "；|;\s", "，|,\s",]
    text_splitter = RecursiveCharacterTextSplitter(
        separators=separators,
        chunk_size=chunk_size,       # 每个文本块的最大字符数
        chunk_overlap=chunk_overlap,    # 文本块之间的重叠字符数
        length_function = tokens
    )
    chunks = text_splitter.split_text(text)
    return chunks


  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# 提取PDF文件中的文本并按token数和字符数分割
import os.path

pdf_files = ["20240401-0级.pdf", "20240401-1级.pdf", "CSST科学白皮书_v1.2.pdf", "CSST科学数据处理系统数据处理软件系统设计20231108.pdf"]
chunk_size = 4000
chunk_overlap = 40
documents = []
for pdf in pdf_files:
    text = extract_text_from_pdf(pdf)
    paragraphs = split_text(text, chunk_size, chunk_overlap)
    documents.extend(paragraphs)

# 将分割后的文本块分别存储到多个txt文件中，并保存文件名到一个列表中
txt_dir = 'split_text_files'
file_names = []
for idx, doc in enumerate(documents, start=1):
    file_name = os.path.join(txt_dir, f"text_file{idx}.txt")
    with open(file_name, "w", encoding="utf-8") as file:
        file.write(doc)
    file_names.append(file_name)

print("文本文件已成功生成。")
print("生成的文件名列表：", file_names)

文本文件已成功生成。
生成的文件名列表： ['split_text_files/text_file1.txt', 'split_text_files/text_file2.txt', 'split_text_files/text_file3.txt', 'split_text_files/text_file4.txt', 'split_text_files/text_file5.txt', 'split_text_files/text_file6.txt', 'split_text_files/text_file7.txt', 'split_text_files/text_file8.txt', 'split_text_files/text_file9.txt', 'split_text_files/text_file10.txt', 'split_text_files/text_file11.txt', 'split_text_files/text_file12.txt', 'split_text_files/text_file13.txt', 'split_text_files/text_file14.txt', 'split_text_files/text_file15.txt', 'split_text_files/text_file16.txt', 'split_text_files/text_file17.txt', 'split_text_files/text_file18.txt', 'split_text_files/text_file19.txt', 'split_text_files/text_file20.txt', 'split_text_files/text_file21.txt', 'split_text_files/text_file22.txt', 'split_text_files/text_file23.txt', 'split_text_files/text_file24.txt', 'split_text_files/text_file25.txt', 'split_text_files/text_file26.txt', 'split_text_files/text_file27.txt', 'split_text_fil

## QA对生成

In [2]:
# 设置百度文心一言的API密钥和端点
API_KEY = "MxvHfAoOFUATRfpnohbnBAYb"
SECRET_KEY = "hOW7n2JSxNJQV1UvYxoUHmoNkmxGi3eB"

def get_access_token():
    """
    使用 AK，SK 生成鉴权签名（Access Token）
    :return: access_token，或是None(如果错误)
    """
    url = "https://aip.baidubce.com/oauth/2.0/token"
    params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY}
    return str(requests.post(url, params=params).json().get("access_token"))

### 问题生成prompt

In [3]:
# 问题生成prompt
prompt1 = '''
#01 你是一个问答对数据集处理专家，并且拥有丰富的天文领域知识。

#02 你的任务是根据我给出的内容，生成适合作为问答对数据集的问题。

#03 该内容有很多从表格中提取出的内容，多根据表格内容生成问题。

#04 问题要尽量短，不要太长。

#05 一句话中只能有一个问题，问题尽量有意义。

#06 最多生成10个问题。

#07 生成问题示例：

"""

"积分视场光谱仪的0级数据包含哪些内容？",
"MCI模块的数据文件命名规范是什么样的？",
"MSC_MS 0级数据关键字PCOUNT是什么意思？",
"MSC_MS 0级数据关键字EXTVER的数据类型是什么？"

"""

#07 以下是我给出的内容：

"""

{{此处替换成你的内容}}

"""
'''

In [5]:
def generate_question(text_content, more=False):
    url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions?access_token=" + get_access_token()
    content= "生成适合作为问答对的问题"
    if more:
        content = "尽可能多生成适合作为问答对的问题"
    prompt = prompt1.replace("{{此处替换成你的内容}}", text_content)
    payload = json.dumps({
        "messages": [
            {
                "role": "user",
                "content": content
            }
        ],
        "temperature": 0.95,
        "top_p": 0.8,
        "system":prompt
    })
    headers = {
        'Content-Type': 'application/json'
    }
    start_time = time.time()
    response = requests.request("POST", url, headers=headers, data=payload)
    x = json.loads(response.text)
    print("耗时", time.time() - start_time)
    print(x)
    if response.status_code == 200:
        return x['result']
    else:
        print(f"Error: {response.status_code}")
        print(response.content)
        return None

### 问答对生成prompt

In [4]:
#问答对生成prompt

prompt2 = '''
#01 你是一个问答对数据集处理专家。

#02 你的任务是根据我的问题和我给出的内容，生成对应的问答对。

#03 答案要全面，只使用我的信息，如果找不到答案，就回复从文档中找不到答案。

#04 使用json将QA对包裹起来，问题用"question"表示，答案用"answer"表示,所有问题答案对存入list:
    
[
    {"question": "积分视场光谱仪的0级数据包含哪些内容？", "answer": "积分视场光谱仪包含以下内容：1.观测目标的原始曝光图像；2.参考图像， 包括偏置、 平场、 暗电流等原始图像数据；3.观测期间的原始导星数据（和多通道成像
仪共用）。这些数据格式都是FITS文件。"},
    {"question": "MSC_MS 0级数据关键字PCOUNT是什么意思？", "answer": 在MSC_MS 0级数据头文件中，PCOUNT关键字表示伴随数据的参数数组的长度。在这个上下文中，对于原始数据，PCOUNT的样例值是0，表示没有伴随数据的参数数组。"}
]


#05 我的问题如下：

"""

{{此处替换成你上一步生成的问题}}

"""

#06 我的内容如下：

"""

{{此处替换成你的内容}}

"""
'''

In [6]:
def generate_qa(text_content, question_text=None):
    url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions?access_token=" + get_access_token()
    content= "拼成问答对"
    prompt = prompt2.replace("{{此处替换成你上一步生成的问题}}", question_text).replace("{{此处替换成你的内容}}", text_content)
    payload = json.dumps({
        "messages": [
            {
                "role": "user",
                "content": content
            }
        ],
        "temperature": 0.95,
        "top_p": 0.8,
        "system":prompt
    })
    headers = {
        'Content-Type': 'application/json'
    }
    start_time = time.time()
    response = requests.request("POST", url, headers=headers, data=payload)
    x = json.loads(response.text)
    print("耗时", time.time() - start_time)
    print(x)
    if response.status_code == 200:
        return x['result']
    else:
        print(f"Error: {response.status_code}")
        print(response.content)
        return None


### 生成QA对并保存至txt文件

In [10]:
def read_file(file_name):
    try:
        with open(file_name, "r", encoding='utf-8') as file:
            content = file.read()
        return content
    except FileNotFoundError:
        print(f"File '{file_name}' not found.")


# 将生成的QA对保存到文件中
def write_to_file(content):
    txt_dir = 'QA_pairs'
    timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    file_name = os.path.join(txt_dir, f"new_file_{timestamp}.txt")
    with open(file_name, "w", encoding="utf-8") as file:
        file.write(content)
    print(f"File '{file_name}' has been created and written.")


#QA对生成
file_list = [f"split_text_files/text_file{i}.txt" for i in range(77, 184)]
for file in file_list:
    text_content = read_file(file)
    print(file)
    print ('text_content:\n', text_content)
    question_text = generate_question(text_content=text_content, more=False)
    print('question_text:\n', question_text)
    qa_text = generate_qa(text_content=text_content, question_text=question_text)
    print('qa_text:\n', qa_text)
    write_to_file(qa_text)


split_text_files/text_file77.txt
text_content:
 弥散程度、
宇宙环境、
大尺度潮汐力场、
并合或吸积历史、
内禀动力学、
恒星形成和超大质量黑洞等一系列的物理性质相关 (例
如：Binney & Tremaine 1987; Djorgovski & Davis 1987; Dressler 
et al. 1987; Tremaine et al. 2002)。星系的形态和结构研究是现
124 
 
今理解星系形成与演化的一个重要途径 （Conselice 2014）
。 
早在1926年，
哈勃就已经建立了现代的星系形态分类构想，
将近
邻宇宙中的星系按照形态分为椭圆星系、
透镜星系和漩涡星系，
即哈
勃序列。
任一成功的星系演化模型必须能解释星系形态分布所呈现的
哈勃序列。
然而经过近一个世纪探索，
人们仍不清楚哈勃序列是如何
起源的。
研究哈勃序列的起源需要具有大样本高空间分辨率的深场图
像。
现有的巡天项目要么分辨率和深度有限
（如斯隆数字巡天SDSS）
，
要么视场有限（如哈勃空间望远镜HST）
。兼备一定深度（平均25.5星
等）
、大天区（主巡天17500平方度）和高空间分辨率（亚角秒级，与
HST类似）的中国空间站光学巡天望远镜（CSST）将会是研究哈勃序
列起源的理想设备。 
 
目视分类作为星系形态分类最早使用的方法，到现在仍被视作最
可靠的分类方法。
近年来，
国际上星系形态目视分类最为成功的例子
是
“Galaxy 
Zoo”
项
目
（
Lintott 
et 
al. 
2008; 
http://www.galaxyzoo.org/）
1，
该项目利用人工对斯隆数字巡天中
近百万个星系进行了目视形态分类。
除了目视分类，
一些非参数量化
形态参数也陆续被引入用于定量描述星系形态，如聚集度C、不对称
性A、团簇度S和基尼系数Gini等。然而，CSST观测到的海量星系图像
将无法使用现有的方法进行形态分类，
亟需开发快速有效的自动形态
分类方法和工具。 
 
要了解各个物理过程如何共同作用于星系的形成，需要对星系
各形态成分（核球、星系盘、旋臂、棒状结构和潮汐特征等）进行表
征并研究其在宇宙时间内的演化。
CSST的测光巡天将显著增大可以测
量内区子结构的星系样本数目，比如核心盘、核心棒、核

In [16]:
#html文件内容的QA对生成
file_list2 = [f"html_to_txt/html_txt{i}.txt" for i in range(43, 51)]
for file in file_list2:
    text_content = read_file(file)
    print(file)
    print ('text_content:\n', text_content)
    question_text = generate_question(text_content=text_content, more=False)
    print('question_text:\n', question_text)
    qa_text = generate_qa(text_content=text_content, question_text=question_text)
    print('qa_text:\n', qa_text)
    write_to_file(qa_text)

html_to_txt/html_txt43.txt
text_content:
 此条目或其章节极大或完全地依赖于某个单一的来源。请协助补充多方面可靠来源以改善这篇条目。致使用者：请搜索一下条目的标题（来源搜索："太阳过渡层成像光谱仪卫星" — 网页、新闻、书籍、学术、图像），以检查网络上是否存在该主题的更多可靠来源（判定指引）

太阳界面区成像光谱仪卫星想像图
太阳界面区成像光谱仪卫星（英语：Interface Region Imaging Spectrograph，缩写：IRIS）是美国国家航空航天局计划中的太阳观测卫星。该计划是小探测工程的探测器，负责观测太阳的过渡层，尤其是色球层的物理状态。洛克希德·马丁太阳与天文物理实验室（Lockheed Martin Solar and Astrophysics Laboratory，LMSAL）负责整合该太阳天文台。该太阳天文台本体和光谱仪是由LMSAL制造，望远镜则由史密松天体物理台提供。当卫星进入轨道后将由LMSAL和艾姆斯研究中心操作。


概要[编辑]
IRIS设计简图。
IRIS的主要仪器是一台高帧率的紫外线成像光谱仪，一幅耗时一秒所拍摄影像的空间分辨率是0.3角秒，相当于1埃以下。
NASA于2009年6月19日宣布IRIS从六个候选的小型探测计划中获选[1]，同时获选的还有 Gravity and Extreme Magnetism SMEX[2]。
IRIS 本体于2013年4月16日到达范登堡空军基地，并确定由飞马座火箭发射[3]。IRIS已于同年6月27日成功发射[4]。

IRIS团队[编辑]
IRIS的科学和工程团队参与单位如下[5]：

洛克希德·马丁太阳与天文物理实验室
洛克希德·马丁遥测与太空探险系统
史密松天体物理台
蒙大拿州立大学
奥斯陆大学理论天文物理研究所
美国国家大气研究中心高山天文台
史丹佛大学
艾姆斯研究中心
戈达德太空飞行中心
美国国家太阳天文台
加州大学柏克莱分校太空科学实验室
普林斯顿大学等离子物理实验室
悉尼大学悉尼天文研究所
荷语天主教鲁汶大学等离子天文物理学实验室
伦敦大学学院穆勒太空科学实验室
拉塞福－阿普顿实验室
欧洲空间局
马克斯·普朗克太阳系研究所
日本国家天文台
哥本哈根大学尼尔斯·波耳研究所
图集[编辑]



本次任务中将使用的飞马座火箭。




## 存入向量库

### 问答对清洗并存入df

In [None]:
def clean_json_content(content):
    # 去掉开头的 'json\n[' 和结尾的 ']'
    if content.startswith("```json\n"):
        content = content[8:].strip()  # 去掉 'json\n'
    if content.endswith('\n```'):
        content = content[:-4].strip()  # 去掉 '\n'
    return content

In [9]:
import os
import json
import pandas as pd

# 假设文件夹路径
folder_path = "QA_pairs"

# 初始化一个空的 DataFrame
df_qa = pd.DataFrame(columns=["question", "answer"])

# 遍历文件夹中的文件
for file_name in os.listdir(folder_path):
    file_path = os.path.join(folder_path, file_name)
    if os.path.isfile(file_path):
        try:
            text = read_file(file_path)
            data = clean_json_content(text)
            data_list = json.loads(data)
            # 确保数据是列表格式，并且每个元素是包含 "question" 和 "answer" 的字典
            if isinstance(data_list, list) and all(isinstance(item, dict) and "question" in item and "answer" in item for item in data_list):
                # 将数据追加到 DataFrame
                df_qa = pd.concat([df_qa, pd.DataFrame(data_list)], ignore_index=True)
            else:
                print(f"Skipping file {file_name}: Invalid format.")
        except Exception as e:
            print(f"Skipping file {file_name}: {e}")


In [10]:
df_qa

Unnamed: 0,question,answer
0,空间站望远镜积分场光谱仪主要观测什么？,空间站望远镜积分场光谱仪主要对近邻星系核球区的冷气体及其中心黑洞活动的直接观测结果。
1,超大质量黑洞的生长主要通过什么过程？,超大质量黑洞的生长是通过吸积过程。
2,积分场光谱技术主要用来证认什么？,积分场光谱技术用谱线的动力学特征证认内流与外流的存在和动力学属性。
3,空间站望远镜积分场光谱仪相比地面仪器有何优势？,空间站望远镜积分场光谱仪可以获得比地面类似仪器更好的空间分辨率，从而可以将类星体吸积和外流的...
4,当吸积率变化时，吸积过程会经历哪些阶段？,当吸积率从超爱丁顿吸积率变化到亚爱丁顿吸积率时，吸积过程分别经历了Slim盘、标准盘和ADA...
...,...,...
2275,多色成像2级处理流水线可以获得哪些高级数据产品？,多色成像2级处理流水线可以获得合并图像、合并图像测光星表、天体分类星表、恒星参数星表、星系测...
2276,合并星表流水线是基于什么数据生成的？,合并星表流水线是基于单次曝光测光星表生成的。
2277,PSF均匀化星表流水线最终生成什么数据产品？,PSF均匀化星表流水线最终生成PSF均匀化测光星表。
2278,合并图像数据流水线使用哪些图像进行合并？,合并图像数据流水线使用同一天区相同波段（可能来自不同探测器）的多次观测图像进行合并。


In [19]:
df_qa.to_csv('QA_pairs/QA_pairs.csv',encoding='utf-8')

In [11]:
len([file for file in os.listdir("QA_pairs")])

228

### 存入向量库

In [1]:
from transformers import BertModel

import torch
# 转换为BERT输入格式
def encode_question(question):
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    inputs = tokenizer(question, return_tensors="pt", truncation=True, padding=True, max_length=128)
    return inputs

# 获取句子向量
def get_sentence_embedding(question):
    inputs = encode_question(question)
    model = BertModel.from_pretrained('bert-base-uncased')
    with torch.no_grad():
        outputs = model(**inputs)
    # BERT最后一层的CLS token输出作为句子的表示
    sentence_embedding = outputs.last_hidden_state[:, 0, :]
    return sentence_embedding


  from .autonotebook import tqdm as notebook_tqdm


### 使用 paraphrase-MiniLM-L6-v2 嵌入

In [4]:
import os
import numpy as np
import pandas as pd
import faiss
from sentence_transformers import SentenceTransformer

# 加载数据
df_qa = pd.read_csv("QA_pairs/QA_pairs.csv") 
questions = df_qa["question"].tolist()

model = SentenceTransformer('sentence-transformers/paraphrase-MiniLM-L6-v2')

# 保存索引到磁盘的文件路径
index_file_path = "QA_pairs/faiss_index_st.index"
metadata_file_path = "QA_pairs/question_map_st.npy"  # 保存问题映射信息

# Step 1: 构建或加载索引
if os.path.exists(index_file_path) and os.path.exists(metadata_file_path):
    # 从磁盘加载索引和问题映射
    faiss_index = faiss.read_index(index_file_path)
    question_map = np.load(metadata_file_path, allow_pickle=True).item()
    print("Loaded FAISS index and metadata from disk.")

else:
    # 为问题生成嵌入向量
    print("Encoding questions:")
    question_embeddings = model.encode(questions, show_progress_bar=True)

    # 构建 FAISS 索引
    dimension = question_embeddings.shape[1]
    faiss_index = faiss.IndexFlatIP(dimension)   # 余弦相似性可以通过归一化向量后使用内积计算
    faiss.normalize_L2(question_embeddings)  # 归一化向量
    faiss_index.add(np.array(question_embeddings))

    # 保存问题的索引映射
    question_map = {i: question for i, question in enumerate(questions)}

    # 将索引和问题映射保存到磁盘
    faiss.write_index(faiss_index, index_file_path)
    np.save(metadata_file_path, question_map)
    print("Built and saved FAISS index and metadata to disk.")

# Step 2: 查询功能
def search_similar_questions(user_query, top_k=3):
    # 为用户查询生成嵌入向量
    user_embedding = model.encode([user_query])
    faiss.normalize_L2(user_embedding)
    # 在 FAISS 索引中检索
    distances, indices = faiss_index.search(np.array(user_embedding), top_k)
    # 返回检索结果
    results = [{"question": question_map[idx], "distance": dist} for dist, idx in zip(distances[0], indices[0])]
    return results

# 测试查询
user_query = "MCI有哪些工作包？"
top_k_results = search_similar_questions(user_query, top_k=3)

# 打印结果
for i, result in enumerate(top_k_results):
    print(f"Rank {i+1}:")
    print(f"Question: {result['question']}")
    print(f"Distance: {result['distance']}")
    print("=" * 50)




Loaded FAISS index and metadata from disk.
Rank 1:
Question: MCI图像评估工作包的输入信息有哪些？
Distance: 0.9332917332649231
Rank 2:
Question: MCI热设计的主要任务是什么？
Distance: 0.8946192264556885
Rank 3:
Question: MCI的宽带滤光片有哪些波段要求？
Distance: 0.8877360820770264


In [2]:
question_embeddings.shape

(2280, 384)