In [38]:
import os
import pandas as pd
import ijson  # For incremental JSON parsing
from jieba import cut_for_search
from datetime import datetime,timedelta
import math
import time
import re

In [25]:
# 获取脚本所在的绝对路径
#parent_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.getcwd()

# 获取上一级目录
base_dir = os.path.dirname(parent_dir)

print("脚本所在目录:", parent_dir)
print("脚本上一级目录:", base_dir)

# 定义子目录的路径
path = os.path.join(base_dir, "index", "jsons")
rank_path = os.path.join(base_dir, "rank")
spider_path = os.path.join(base_dir, "spider")

# 检查文件夹是否存在
if not os.path.exists(path):
    raise FileNotFoundError(f"Directory not found: {path}")
if not os.path.exists(rank_path):
    raise FileNotFoundError(f"Directory not found: {rank_path}")
if not os.path.exists(spider_path):
    raise FileNotFoundError(f"Directory not found: {spider_path}")

# Incremental JSON loader
def load_json_incrementally(file_path):
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")
    data = {}
    with open(file_path, 'r', encoding='utf-8') as f:
        parser = ijson.items(f, "item")
        for key, value in ijson.kvitems(f, ""):
            if key not in data:
                data[key] = value
    return data


# Load JSON files with memory efficiency
invert_index = load_json_incrementally(os.path.join(path, "invert_index.json"))
invert_index_title = load_json_incrementally(os.path.join(path, "invert_index_title.json"))
tf_idf = load_json_incrementally(os.path.join(path, "tf-idf.json"))
tf_idf_title = load_json_incrementally(os.path.join(path, "tf-idf_title.json"))
word_frequency = load_json_incrementally(os.path.join(path, "allTF.json"))
word_frequency_title = load_json_incrementally(os.path.join(path, "allTF_title.json"))
tf = load_json_incrementally(os.path.join(path, "tf.json"))
tf_title = load_json_incrementally(os.path.join(path, "tf_title.json"))
idf = load_json_incrementally(os.path.join(path, "idf.json"))
idf_title = load_json_incrementally(os.path.join(path, "idf_title.json"))

# 排序词集合
word_set = sorted(set(word_frequency.keys()))
word_set_title = sorted(set(word_frequency_title.keys()))

# 读取 CSV 文件
page_rank = pd.read_csv(os.path.join(rank_path, "page_rank_allnews.csv"), encoding='utf-8-sig')
all_info = pd.read_csv(os.path.join(spider_path, "all_news.csv"), encoding='utf-8')

print("数据加载完成！")

脚本所在目录: e:\fifth_semester\Information _Retrieve\2211123_张天歌_hw4\Search Engine\Search
脚本上一级目录: e:\fifth_semester\Information _Retrieve\2211123_张天歌_hw4\Search Engine
数据加载完成！


In [26]:
# 将 all_info 和 page_rank 转换为字典
all_info_dict = all_info.set_index('url').to_dict(orient='index')
page_rank_dict = page_rank.set_index('url')['page_rank'].to_dict()

In [27]:
all_info

Unnamed: 0,title,url,content,date,editor,description,doc_link
0,健康报：创新DNA存储方案破解医疗数据存储难题-天津大学新闻网,https://news.tju.edu.cn/info/1005/73323.htm,健康报讯（特约记者 李哲 通讯员 赵晖）天津大学合成生物学研究团队与天津市环湖医院携手，提出...,1.730477e+09,（编辑 刘延俊 贺泳迪）,健康报：创新DNA存储方案破解医疗数据存储难题-天津大学新闻网,
1,天津教育报：校企共探产教协同发展新路径-天津大学新闻网,https://news.tju.edu.cn/info/1005/73306.htm,天津教育报讯（记者 刘东岳 通讯员 潘静洲）11月20日，“新质生产力与人才培养论坛”在天开...,1.730477e+09,（编辑 焦德芳 郭新婷）,天津教育报：校企共探产教协同发展新路径-天津大学新闻网,
2,中国新闻网：新质生产力与人才培养论坛召开，校企共探产教协同发展新路径-天津大学新闻网,https://news.tju.edu.cn/info/1005/73309.htm,中新网天津新闻11月21日电 11月20日，“新质生产力与人才培养论坛”在天开高教科创园召开...,1.730477e+09,（编辑 刘延俊 贺泳迪）,中国新闻网：新质生产力与人才培养论坛召开，校企共探产教协同发展新路径-天津大学新闻网,
3,央广网：新质生产力与人才培养论坛在津召开-天津大学新闻网,https://news.tju.edu.cn/info/1005/73308.htm,央广网讯（记者 张强）11月20日，“新质生产力与人才培养论坛”在天开高教科创园召开。本次论...,1.730477e+09,（编辑 焦德芳 郭新婷）,央广网：新质生产力与人才培养论坛在津召开-天津大学新闻网,
4,中国科学报：校企共商新质生产力与人才培养-天津大学新闻网,https://news.tju.edu.cn/info/1005/73307.htm,中国科学报讯（记者 潘静洲 陈彬）11月20日，“新质生产力与人才培养论坛”在天津市天开高教...,1.730477e+09,（编辑 焦德芳 郭新婷）,中国科学报：校企共商新质生产力与人才培养-天津大学新闻网,
...,...,...,...,...,...,...,...
102842,天津市来校督导检查秋季学期开学准备工作-南开要闻-南开大学,http://news.nankai.edu.cn/ywsd/system/2018/09/...,通讯员 王芃 摄影 聂际慈)9月4日，天津市教委、市政府教育督导室督导检查组来我校督导检查秋...,1.536077e+09,蓝芳,南开新闻网讯(通讯员 王芃 摄影 聂际慈)9月4日，天津市教委、市政府教育督导室督导检查组来...,
102843,南开大学一批教师和集体获得天津市荣誉表彰-南开要闻-南开大学,http://news.nankai.edu.cn/ywsd/system/2018/09/...,(记者 马超)在第34个教师节到来之际，南开大学教师获奖消息不断、捷报频传。继化学学院教授周...,1.536077e+09,蓝芳,南开新闻网讯(记者 马超)在第34个教师节到来之际，南开大学教师获奖消息不断、捷报频传。继化...,
102844,校领导出访韩国-南开要闻-南开大学,http://news.nankai.edu.cn/ywsd/system/2018/09/...,(通讯员 张萌)8月30日至9月3日，副校长朱光磊率团出访韩国仁川市、首尔市及釜山市高校，并...,1.536163e+09,蓝芳,南开新闻网讯(通讯员 张萌)8月30日至9月3日，副校长朱光磊率团出访韩国仁川市、首尔市及釜...,
102845,佛系“南开人”实力开战 勇夺《一站到底》世界名校争霸赛冠军-南开要闻-南开大学,http://news.nankai.edu.cn/ywsd/system/2018/09/...,(记者 郝静秋)9月3日晚，在江苏卫视《一站到底》世界名校争霸赛第五季总决赛中，南开大学商学...,1.536077e+09,蓝芳,南开新闻网讯(记者 郝静秋)9月3日晚，在江苏卫视《一站到底》世界名校争霸赛第五季总决赛中，...,


In [28]:

# 这个函数主要用于计算输入字符串和历史记录的TF值，文档库的TF值和IDF值我们已经通过前期数据处理得到
def getTF(words, input):
    # 初始化一个词频字典，key为传入的所有的词，value为0
    tf = dict.fromkeys(words, 0)
    for word in input:
        if word in words:
            tf[word] += 1
    for word, count in tf.items():
        tf[word] = math.log10(count + 1)
    return tf

def getTF_IDF(tf, idf):
    tfidf = {}
    for word, count in tf.items():
        tfidf[word] = float(count) * float(idf[word])
    return tfidf

def getVecLength(key:list)->float:
    """
    :param key: 关键词列表
    :return: 向量长度
    """
    length = 0
    for i in range(len(key)):
        length = length + key[i][1]**2
    return round(math.sqrt(length),2)

In [29]:
def simple_search(input: str, history: list, onlyTitle: bool = False, num: int = 100):
    """
    :param input: 用户输入的查询字符串
    :param history: 检索历史列表
    :param onlyTitle: 是否启动仅在标题中检索
    :param num：返回结果的数量，默认为100条最相似的
    :return: 一个列表，元素为URL和相似度组成的元组
    """
    
    # 对输入中的通配符替换为正则表达式
    wildcard_to_regex = lambda s: re.sub(r"(\*)", ".*", re.sub(r"(\?)", ".", re.escape(s)))
    regex_query = wildcard_to_regex(input)
    
    # Debug: 输出正则表达式
    print(f"Converted wildcard input to regex: {regex_query}")
    
    regex = r'[\.\^\$\*\+\?\{\}\[\]\|\(\)]'
    isRe = re.search(regex, input)
    if isRe is not None:
        input = re.sub(regex, '', input)
    
    spilt_input = sorted(list(cut_for_search(input)))
    spilt_input = [term for term in spilt_input if term not in ["", " "]]
    spilt_history = []
    for i in range(len(history)):
        ls = list(cut_for_search(history[i]))
        ls = [term for term in ls if term not in ["", " "]]
        spilt_history.extend(ls)
    
    # 输出分词结果
    print(f"Spilt input: {spilt_input}")
    print(f"Spilt history: {spilt_history}")
    
    # 判断用户需要的搜索模式
    if onlyTitle:
        tf_dict = tf_title  # 读取的json数据
        idf_dict = idf_title
        words = word_set_title
    else:
        tf_dict = tf
        idf_dict = idf
        words = word_set
    
    tfidf_dict = {}
    for key, value in tf_dict.items():
        tfidf_dict[key] = getTF_IDF(value, idf_dict)
    
    # 存储关键词的tfidf值，找到num个最大的
    key_tfidf_dict = {}
    for key, value in tfidf_dict.items():
        key_tfidf_dict[key] = sorted(tfidf_dict[key].items(), key=lambda item: item[1], reverse=True)[0:num]
    
    # 保存关键词字典的key与value
    key_tfidf_dict_keys = list(key_tfidf_dict.keys())  # 即url组成的列表
    key_tfidf_dict_values = list(key_tfidf_dict.values())
    
    # 用户输入查询的TFIDF
    tf_input = getTF(words, spilt_input)
    tfidf_input = getTF_IDF(tf_input, idf_dict)
    key_input = sorted(tfidf_input.items(), key=lambda item: item[1], reverse=True)[0:num]
    len_key_input = getVecLength(key_input)
    
    # Validate input vector length
    if len_key_input == 0:
        raise ValueError("Input query resulted in an empty vector. Please refine your search query.")
    
    # 历史记录的TFIDF
    tf_history = getTF(words, spilt_history)
    tfidf_history = getTF_IDF(tf_history, idf_dict)
    key_history = sorted(tfidf_history.items(), key=lambda item: item[1], reverse=True)[0:num]
    len_key_history = getVecLength(key_history)
    
    # 如果词库里没有搜索项，那么返回错误
    if len_key_input == 0:
        print("调试信息：key_input为空，用户输入的关键词未在词库中找到。")
        raise KeyError("用户输入的关键词未找到，请检查索引或输入的正确性。")
    
    # 向量空间模型，计算余弦相似度
    key_results = []  # 用于存储余弦相似度
    key_results_index = []  # 记录文档索引
    for i in range(len(key_tfidf_dict_keys)):
        length = 0
        temp_list = key_tfidf_dict_values[i]
        # 遍历每个输入关键词
        for key in key_input:
            if key[1] != 0:  # tf-idf值不为0才存在相似度
                # 遍历文档内的每个关键词
                for value in temp_list:
                    if key[0] == value[0]:
                        length = length + key[1] * value[1]
        res = getVecLength(temp_list)
        if res == 0.0:
            continue
        # 余弦相似度
        sim = round(length / (len_key_input * res), 4)
        
        key_results.append((key_tfidf_dict_keys[i], sim))
        if sim > 0:
            key_results_index.append(i)
    
    if len(history) > 0:
        history_results_dict = {}
        for item in key_results_index:
            length = 0
            temp_list = key_tfidf_dict_values[item]
            for _key_history in key_history:
                if _key_history[1] != 0:
                    for value in temp_list:
                        if _key_history[0] == value[0]:
                            length = length + _key_history[1] * value[1]
            sim = round(length / (len_key_history * getVecLength(temp_list)), 4)
            history_results_dict[item] = (key_tfidf_dict_keys[item], sim)
        
        results = []
        for i in range(len(key_tfidf_dict_keys)):
            if i >= len(key_results):
                break
            #print(f"Key results输出结果调试: {key_results[i][1]}")
            if key_results[i][1] == 0:
                pass
            elif j := history_results_dict.get(i):
                # 设置历史记录的权重为0.1
                results.append((key_results[i][0], key_results[i][1] + j[1] / 10))
            else:
                results.append((key_results[i][0], key_results[i][1]))
        results = sorted(results, key=lambda item: item[1], reverse=True)
    # 没有历史记录时，直接利用字典计算余弦相似度即可
    else:
        results = []
        for i in range(len(key_tfidf_dict_keys)):
            if i >= len(key_results):
                break
            results.append((key_results[i][0], key_results[i][1]))
        results = sorted(results, key=lambda item: item[1], reverse=True)
    
    ls, ans = [], []
    for res in results:
        if res[1] > 0:
            ls.append((res[0], res[1]))
    if isRe is not None:
        for item in ls:
            row = all_info_dict.get(item[0])
            # if re.search(escaped_input, str(row['title'])) is not None or re.search(escaped_input, str(row['description'])) is not None or re.search(escaped_input, str(row['description'])) is not None:
            #     ans.append(item)
            if re.search(regex_query, row['title']) or re.search(regex_query, row['description']):
                ans.append(item)
    if isRe is None:
        return ls
    return ans


In [30]:
def simple_search_test(input:str,history:list):
    time1 = time.time()
    ret = simple_search(input,history)
    print("在全文中出现的结果：")
    time2 = time.time()
    for item in ret:
        print(item)
    print("在"+str(time2-time1)+"秒时间内响应，返回"+str(len(ret))+"项结果")

    time1 = time.time()
    ret = simple_search(input,history,True)
    time2 = time.time()
    print("仅在标题中出现的结果：")
    for item in ret:
        print(item)
    print("在" + str(time2 - time1) + "秒时间内响应，返回" + str(len(ret)) + "项结果")

In [31]:
def expand_results(results: list):
    expanded = []
    for res in results:
        url = res[0]
        
        # 从字典中查找 all_info
        row = all_info_dict.get(url)
        if row is None:
            print(f"URL {url} 不存在于 all_info 中，跳过该项。")
            continue
        
        title = str(row['title']).replace("_", "/")
        dsp = str(row['description'])
        
        # 从字典中查找 page_rank 值，默认为 0
        page_rank_value = page_rank_dict.get(url, 0)
        if page_rank_value == 0:
            print(f"URL {url} 不存在于 page_rank 中，使用默认值 0 计算综合得分。")
        
        # 计算综合得分
        score = res[1] * 0.7 + 0.3 * page_rank_value
        expanded.append((title, url, dsp, score))
    
    # 按综合得分排序
    return sorted(expanded, key=lambda item: item[-1], reverse=True)


# 测试函数：接收输入和历史记录，并打印结果
def expand_results_test(input: str, history: list):
    results = simple_search(input, history, True)  # 假设 simple_search 返回一个结果列表
    expanded = expand_results(results)
    for entry in expanded:
        print(entry)


In [32]:
# 带有发布时间限制的搜索函数
def check_time(result,limit):
    """
    :param result: simple_search返回结果拓展后的结果的一行
    :param limit：时间限制字符串
    :return: 是否满足要求
    """
    row = all_info_dict.get(result[1])
    if str(row['date']) != "nan":
        # 将时间戳转换为datetime
        articleTime = datetime.fromtimestamp(int(row['date']))
        res = datetime.now() - articleTime
        if limit == "一周内":
            if res > timedelta(days=7):
                return False
        elif limit == "一个月内":
            if res > timedelta(days=30):
                return False
        elif limit == "一年内":
            if res > timedelta(days=365):
                return False
    if str(row['date']) == "nan":
        return False
    return True

def check_time_test(input,limit):
    ret = simple_search(input,[])
    expanded = expand_results(ret)
    print("时间限制添加前的结果，共有"+str(len(expanded))+"条：")
    for item in expanded:
        print(item)
    expanded = [item for item in expanded if check_time(item,limit)==True]
    print("时间限制添加后的结果，共有"+str(len(expanded))+"条：")
    for item in expanded:
        print(item)


In [33]:
# 检查是不是指定的域名或者网站
def check_website(result,name):
    if name not in result[1]:
        return False
    return True

def check_website_test(input,name):
    ret = simple_search(input,[])
    expanded = expand_results(ret)
    print(f"网站或域名限制前的结果，共有{len(expanded)}条：")
    for item in expanded:
        print(item)
    expanded = [item for item in expanded if check_website(item,name)==True]
    print(f"网站或域名限制后的结果，共有{len(expanded)}条：")
    for item in expanded:
        print(item)

In [34]:
# 检查是否和规定的词匹配，传入一个标志位代表是否进行完全匹配
# 如果不进行完全匹配，那么只要出现一个词就可以判定为True
def check_match_words(result,input,complete=True):
    row = all_info_dict.get(result[1])  
    text = f"{row['title']}#{row['description']}#{row['content']}#{row['editor']}"
    ls = str(input).split(" ")
    for word in ls:
        if word == '#':
            pass
        if word not in text:
            if complete == True:
                return False
        if word in text:
            if complete == False:
                return True
    if complete == True:
        return True
    return False

def check_complete_match_test(input,limit):
    ret = simple_search(input,[])
    expanded = expand_results(ret)
    print(f"限制前的结果，共有{len(expanded)}条：")
    expanded = [item for item in expanded if check_match_words(item,limit,True) == True]
    print(f"包含以下所有词限制后的结果，共有{len(expanded)}条：")
    for item in expanded:
        print(item)

In [35]:
# 检查是否不含一些词
def check_not_include(result,input):
    row = all_info_dict.get(result[1])
    text = f"{row['title']}#{row['description']}#{row['content']}#{row['editor']}"
    ls = str(input).split(" ")
    ls = [word for word in ls if word != '']
    for word in ls:
        if word == '#':
            pass
        if word in text:
            return False
    return True

In [36]:
ret = simple_search("运动",['陈雨露'])
# ret = expand_results(ret)
# ret = [item for item in ret if check_time(item,"一年内")]
# print(f"过滤时间后，剩余{len(ret)}条")
# ret = [item for item in ret if check_website(item,"nankai")]
# print(f"过滤来源后，剩余{len(ret)}条")
# ret = [item for item in ret if check_match_words(item,"校长 大学",True)]
# for item in ret:
#     print(item)
# print(f"过滤必须词后，剩余{len(ret)}条")
# check_complete_match_test("运动会","\"校长\"")

Converted wildcard input to regex: 运动
Spilt input: ['运动']
Spilt history: ['陈', '雨露']


In [37]:
ret

[('https://news.tju.edu.cn/info/1003/22222.htm', 0.3453),
 ('https://news.tju.edu.cn/info/1003/23312.htm', 0.3453),
 ('https://news.tju.edu.cn/info/1017/53602.htm', 0.3388),
 ('https://news.tju.edu.cn/info/1005/19543.htm', 0.3123),
 ('http://news.nankai.edu.cn/zhxw/system/2013/11/01/000150335.shtml', 0.3085),
 ('http://news.nankai.edu.cn/zhxw/system/2013/09/27/000142434.shtml', 0.304),
 ('https://news.tju.edu.cn/info/1016/35976.htm', 0.301),
 ('http://news.nankai.edu.cn/zhxw/system/2011/04/19/000038444.shtml', 0.2994),
 ('http://news.nankai.edu.cn/zhxw/system/2013/09/27/000142435.shtml', 0.2915),
 ('http://news.nankai.edu.cn/mtnk/system/2019/04/29/030033149.shtml', 0.2901),
 ('https://news.tju.edu.cn/info/1017/53788.htm', 0.2871),
 ('http://news.nankai.edu.cn/zhxw/system/2013/10/22/000147795.shtml', 0.2849),
 ('https://news.tju.edu.cn/info/1017/53794.htm', 0.2792),
 ('https://news.tju.edu.cn/info/1017/35273.htm', 0.278),
 ('http://news.nankai.edu.cn/mtnk/system/2015/12/10/000259997.sht

In [39]:
import re

def search_documents(input_query: str, all_info_dict: dict):
    """
    查询包含附件链接的条目，支持对附件标题、描述、链接的搜索。
    
    :param input_query: 用户输入的查询关键字或正则表达式
    :param all_info_dict: 包含所有网页数据的字典
    :return: 带有附件链接的查询结果列表
    """
    # 转换通配符为正则表达式
    wildcard_to_regex = lambda s: re.sub(r"(\*)", ".*", re.sub(r"(\?)", ".", re.escape(s)))
    regex_query = wildcard_to_regex(input_query)
    
    # 存放匹配结果
    matched_results = []
    
    for key, row in all_info_dict.items():
        doc_link = row.get("doc_link")
        if pd.isna(doc_link):  # 如果 doc_link 为空，则跳过
            continue
        
        # 搜索匹配：标题、描述、附件链接
        title = row.get("title", "")
        description = row.get("description", "")
        if (re.search(regex_query, title) or
            re.search(regex_query, description) or
            re.search(regex_query, doc_link)):
            matched_results.append({
                "title": title,
                "url": row.get("url"),
                "description": description,
                "doc_link": doc_link
            })
    
    return matched_results


In [48]:
# 查询所有 PDF 附件
results = search_documents("*.pdf", all_info_dict)
results1 = search_documents("onlinelibrary", all_info_dict)#搜索链接名
print(results)

[{'title': '今晚报：简讯-天津大学新闻网', 'url': None, 'description': '今晚报：简讯-天津大学新闻网', 'doc_link': ' http://epaper.jwb.com.cn/jwb/resfile/2018-12-17/02/jwb2018121702.pdf'}, {'title': '天津大学生命科学学院王亚鑫、叶升团队在高致病性病毒纳米抗体研究中取得系列进展-天津大学新闻网', 'url': None, 'description': '天津大学生命科学学院王亚鑫、叶升团队在高致病性病毒纳米抗体研究中取得系列进展-天津大学新闻网', 'doc_link': 'https://www.nature.com/articles/s41467-024-51414-6.pdf'}]


: 

In [3]:
import pandas as pd
def load_file_links_from_csv(file_path):  
    """  
    Load file links from a CSV file and return as a DataFrame.  

    Parameters:  
    file_path (str): The path to the CSV file.  

    Returns:  
    DataFrame: A DataFrame containing file links.  
    """  
    return pd.read_csv(file_path)  

def search_file_link(file_df, query):  
    """  
    Search for a file link based on the query in the DataFrame.  

    Parameters:  
    file_df (DataFrame): The DataFrame containing file links.  
    query (str): The query to search for.  

    Returns:  
    dict: A dictionary with link and description if found, else None.  
    """  
     # 处理缺失值  
    file_df = file_df.dropna(subset=['title'])  
    filtered_files = file_df[file_df['title'].str.contains(query, case=False)]  
    if not filtered_files.empty:  
        return filtered_files.iloc[0].to_dict()  
    return None  

def test_file_link_search(query, file_df):  
    file_link = search_file_link(file_df, query)  
    if file_link:  
        print(f"文件链接: {file_link['url']} - {file_link['title']}")  
    else:  
        print("未找到匹配的文件链接。")  

In [8]:
# Step 1: Load file links from CSV  
output_csv_file = '../spider/file_links.csv'  # CSV 文件的路径  
file_sparse_links = load_file_links_from_csv(output_csv_file)  

# Step 2: Test search function  
test_query = "新冠肺炎防控指南漫画（汉西双语）"  # 示例查询  
test_file_link_search(test_query, file_sparse_links)

文件链接: http://n1.sinaimg.cn/tj/7d076bde/20200702/xi.pdf - 新冠肺炎防控指南漫画（汉西双语）
