In [1]:
# 1.将SingFile导出的html文件进行解析生成ris文件
# 首先鉴于当前的scopus并不支持对于arxiv的批量导出，同时arxiv尽管是预印版，但是有着一些高引的文章，值得参考
# 51s 1326篇文献
import re
import os
from datetime import datetime
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor

# 清洗文本
def clean_text(text):
    if not text:
        return ""
    return re.sub(r'\s+', ' ', text).strip()

# 处理单个 HTML 文件，提取文献信息
def process_html_file(filepath):
    with open(filepath, "r", encoding="utf-8") as f:
        html = f.read()
    soup = BeautifulSoup(html, 'html.parser')
    current_time = datetime.now().strftime("%Y/%m/%d/%H:%M:%S")
    documents = []
    rows = soup.find_all('tr', class_='TableItems-module__m0Z0b')

    for row in rows:
        preprint_span = row.find('span', string=re.compile('Preprint.*开放获取'))
        if preprint_span and not row.find('h3'):
            continue

        title_div = row.find('div', class_='TableItems-module__sHEzP')
        if title_div and title_div.find('h3'):
            doc = {}
            title_span = title_div.find('h3').find('span')
            doc['title'] = clean_text(title_span.get_text(strip=True)) if title_span else ''

            author_div = row.find('div', class_='author-list')
            authors = []
            if author_div:
                author_buttons = author_div.find_all('button')
                for button in author_buttons:
                    author_name = button.find('span', class_='Typography-module__lVnit')
                    if author_name:
                        authors.append(clean_text(author_name.get_text(strip=True)))
            doc['authors'] = authors

            source_div = row.find('div', class_='DocumentResultsList-module__tqiI3')
            doc['publisher'] = clean_text(source_div.find('span').get_text(strip=True)) if source_div else ''

            year_div = row.find('div', class_='TableItems-module__TpdzW')
            doc['year'] = clean_text(year_div.find('span').get_text(strip=True)) if year_div else ''

            abstract = ''
            current_row = row
            while True:
                next_row = current_row.find_next('tr')
                if not next_row:
                    break
                abstract_div = next_row.find('div', class_='Abstract-module__ukTwj')
                if abstract_div:
                    abstract = clean_text(abstract_div.get_text(strip=True))
                    break
                current_row = next_row
            doc['abstract'] = abstract
            doc['current_time'] = current_time
            documents.append(doc)
    return documents

# 将文献信息写入 RIS 格式
def write_ris(documents, output_path):
    with open(output_path, "a", encoding="utf-8") as f:
        for doc in documents:
            f.write("TY  - GEN\n")
            for author in doc['authors']:
                f.write(f"AU  - {author}\n")
            f.write(f"TI  - {doc['title']}\n")
            if doc['abstract']:
                f.write(f"AB  - {doc['abstract']}\n")
            if doc['publisher']:
                f.write(f"PB  - {doc['publisher']}\n")
            if doc['year']:
                f.write(f"PY  - {doc['year']}\n")
            st = doc['title'].split(":")[0] if ":" in doc['title'] else doc['title']
            f.write(f"ST  - {st}\n")
            f.write(f"Y2  - {doc['current_time']}\n")
            f.write("ER  -\n\n")

# 主函数：多线程读取多个文件并合并写入 RIS 文件
def main():
    html_dir = R"../../data/arxiv/html"
    ris_output_path = "../../data/arxiv/html_result/arxiv_results_multi.ris"
    
    filenames = [
        "Scopus - 文献搜索结果 ｜ 已登录 (2025_5_5 21：53：23).html",
        "Scopus - 文献搜索结果 ｜ 已登录 (2025_5_5 21：53：02).html",
        "Scopus - 文献搜索结果 ｜ 已登录 (2025_5_5 21：52：37).html",
        "Scopus - 文献搜索结果 ｜ 已登录 (2025_5_5 21：52：11).html",
        "Scopus - 文献搜索结果 ｜ 已登录 (2025_5_5 21：51：47).html",
        "Scopus - 文献搜索结果 ｜ 已登录 (2025_5_5 21：51：20).html",
        "Scopus - 文献搜索结果 ｜ 已登录 (2025_5_5 21：49：05).html",
    ]

    filepaths = [os.path.join(html_dir, name) for name in filenames]

    # 多线程处理文件
    all_documents = []
    with ThreadPoolExecutor(max_workers=7) as executor:
        results = list(executor.map(process_html_file, filepaths))
        for docs in results:
            all_documents.extend(docs)

    # 写入 RIS
    write_ris(all_documents, ris_output_path)
    print(f"已成功处理 {len(filepaths)} 个文件，共导出 {len(all_documents)} 条文献。")

if __name__ == "__main__":
    main()


已成功处理 7 个文件，共导出 1326 条文献。


In [4]:
# 2.对于每次scopus更新的arxiv多出来的条目列出来，并且生成相应的ris文件
import re
import os
from datetime import datetime
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor

# 清洗文本
def clean_text(text):
    if not text:
        return ""
    return re.sub(r'\s+', ' ', text).strip()

# 解析 RIS 文件，提取条目
def parse_ris_file(ris_filepath):
    entries = []
    current_entry = {}
    with open(ris_filepath, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line.startswith("TY  -"):
                current_entry = {}
            elif line.startswith("ER  -"):
                if current_entry:
                    entries.append(current_entry)
            elif line:
                key, value = map(str.strip, line.split(" - ", 1))
                if key in current_entry:
                    if isinstance(current_entry[key], list):
                        current_entry[key].append(value)
                    else:
                        current_entry[key] = [current_entry[key], value]
                else:
                    current_entry[key] = value
    return entries

# 处理单个 HTML 文件，提取文献信息
def process_html_file(filepath):
    with open(filepath, "r", encoding="utf-8") as f:
        html = f.read()
    soup = BeautifulSoup(html, 'html.parser')
    current_time = datetime.now().strftime("%Y/%m/%d/%H:%M:%S")
    documents = []
    rows = soup.find_all('tr', class_='TableItems-module__m0Z0b')

    for row in rows:
        preprint_span = row.find('span', string=re.compile('Preprint.*开放获取'))
        if preprint_span and not row.find('h3'):
            continue

        title_div = row.find('div', class_='TableItems-module__sHEzP')
        if title_div and title_div.find('h3'):
            doc = {}
            title_span = title_div.find('h3').find('span')
            doc['title'] = clean_text(title_span.get_text(strip=True)) if title_span else ''

            author_div = row.find('div', class_='author-list')
            authors = []
            if author_div:
                author_buttons = author_div.find_all('button')
                for button in author_buttons:
                    author_name = button.find('span', class_='Typography-module__lVnit')
                    if author_name:
                        authors.append(clean_text(author_name.get_text(strip=True)))
            doc['authors'] = authors

            source_div = row.find('div', class_='DocumentResultsList-module__tqiI3')
            doc['publisher'] = clean_text(source_div.find('span').get_text(strip=True)) if source_div else ''

            year_div = row.find('div', class_='TableItems-module__TpdzW')
            doc['year'] = clean_text(year_div.find('span').get_text(strip=True)) if year_div else ''

            abstract = ''
            current_row = row
            while True:
                next_row = current_row.find_next('tr')
                if not next_row:
                    break
                abstract_div = next_row.find('div', class_='Abstract-module__ukTwj')
                if abstract_div:
                    abstract = clean_text(abstract_div.get_text(strip=True))
                    break
                current_row = next_row
            doc['abstract'] = abstract
            doc['current_time'] = current_time
            documents.append(doc)
    return documents

# 将文献信息写入 RIS 格式
def write_ris(documents, output_path):
    with open(output_path, "w", encoding="utf-8") as f:
        for doc in documents:
            f.write("TY  - GEN\n")
            for author in doc['authors']:
                f.write(f"AU  - {author}\n")
            f.write(f"TI  - {doc['title']}\n")
            if doc['abstract']:
                f.write(f"AB  - {doc['abstract']}\n")
            if doc['publisher']:
                f.write(f"PB  - {doc['publisher']}\n")
            if doc['year']:
                f.write(f"PY  - {doc['year']}\n")
            st = doc['title'].split(":")[0] if ":" in doc['title'] else doc['title']
            f.write(f"ST  - {st}\n")
            f.write(f"Y2  - {doc['current_time']}\n")
            f.write("ER  -\n\n")

# 检测重复并返回不重复的文献
def deduplicate_documents(new_documents, existing_ris_filepath):
    existing_entries = parse_ris_file(existing_ris_filepath)
    existing_titles = {entry['TI'].lower() for entry in existing_entries if 'TI' in entry}
    
    unique_documents = []
    duplicate_count = 0
    
    for doc in new_documents:
        if doc['title'].lower() not in existing_titles:
            unique_documents.append(doc)
        else:
            duplicate_count += 1
    
    return unique_documents, duplicate_count

# 主函数
def main():
    # 配置路径（需要用户修改）
    html_dir = "../../data/arxiv/html"  # HTML 文件目录
    existing_ris_filepath = "../../data/arxiv/html_result/arxiv_results_multi_1635_tad_tal.ris"  # 现有 RIS 文件
    output_ris_filepath = "../../data/arxiv/html_result/unique_arxiv_results.ris"  # 输出不重复 RIS 文件
    
    # HTML 文件列表（示例，用户需替换为实际文件名）
    filenames = [
        "Scopus - 文献搜索结果 ｜ 已登录 (2025_5_9 16：50：37).html",
    ]
    filepaths = [os.path.join(html_dir, name) for name in filenames]

    # 多线程处理 HTML 文件
    all_documents = []
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(process_html_file, filepaths))
        for docs in results:
            all_documents.extend(docs)

    # 去重
    unique_documents, duplicate_count = deduplicate_documents(all_documents, existing_ris_filepath)

    # 写入不重复的 RIS 文件
    if unique_documents:
        write_ris(unique_documents, output_ris_filepath)
    
    # 打印结果
    print(f"处理了 {len(filepaths)} 个 HTML 文件，共提取 {len(all_documents)} 条文献。")
    print(f"发现 {duplicate_count} 条重复文献。")
    print(f"导出了 {len(unique_documents)} 条不重复文献到 {output_ris_filepath}。")

if __name__ == "__main__":
    main()

处理了 1 个 HTML 文件，共提取 200 条文献。
发现 195 条重复文献。
导出了 5 条不重复文献到 ../../data/arxiv/html_result/unique_arxiv_results.ris。


In [1]:
# 3.处理重复的条目，保留最详细的，并且在日志里面输出当前去重过的期刊 二重 去重逻辑 题目 DOI 现在存在bug -和空格区分不开
import uuid
import logging
import datetime
import re
from collections import defaultdict

def normalize_title(title):
    """Normalize title for comparison by removing case and punctuation."""
    return re.sub(r'[^\w\s]', '', title.lower()).strip()

def parse_ris_file(file_path):
    """Parse RIS file and return a list of entries."""
    entries = []
    current_entry = {}
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            line = line.strip()
            if line == 'ER  -':
                if current_entry:
                    entries.append(current_entry)
                    current_entry = {}
            elif line:
                tag, value = line.split('  - ', 1) if '  - ' in line else (line, '')
                current_entry[tag] = current_entry.get(tag, []) + [value]
    return entries

def count_entry_fields(entry):
    """Count the number of fields (data lines) in an entry."""
    return sum(len(values) for values in entry.values())

def deduplicate_by_field(entries, field, normalize=False):
    """Deduplicate entries based on a specified field, keeping the one with most fields."""
    field_to_entries = defaultdict(list)
    for entry in entries:
        field_value = entry.get(field, [''])[0]
        if field_value:  # Only process entries with the field
            key = normalize_title(field_value) if normalize else field_value
            field_to_entries[key].append(entry)
    
    deduplicated = []
    log_messages = []
    
    for key, entries_group in field_to_entries.items():
        if len(entries_group) > 1:
            # Sort by number of fields (descending) and keep the one with most fields
            entries_group.sort(key=count_entry_fields, reverse=True)
            kept_entry = entries_group[0]
            deduplicated.append(kept_entry)
            # Log removed entries
            for removed_entry in entries_group[1:]:
                log_messages.append(
                    f"Removed duplicate entry with {field} '{key}' "
                    f"(kept {count_entry_fields(kept_entry)} fields, "
                    f"removed {count_entry_fields(removed_entry)} fields, "
                    f"title: '{removed_entry.get('TI', [''])[0]}')"
                )
        else:
            deduplicated.append(entries_group[0])
    
    # Add entries that didn't have the field
    for entry in entries:
        if not entry.get(field, [''])[0]:
            deduplicated.append(entry)
    
    return deduplicated, log_messages

def deduplicate_entries(entries):
    """Deduplicate entries first by TI, then by DO."""
    # Step 1: Deduplicate by TI
    entries, ti_log_messages = deduplicate_by_field(entries, 'TI', normalize=True)
    
    # Step 2: Deduplicate by DO
    entries, do_log_messages = deduplicate_by_field(entries, 'DO', normalize=False)
    
    return entries, ti_log_messages + do_log_messages

def write_ris_file(entries, output_path):
    """Write deduplicated entries to a new RIS file with a blank line between entries."""
    with open(output_path, 'w', encoding='utf-8') as file:
        for i, entry in enumerate(entries):
            for tag, values in entry.items():
                for value in values:
                    file.write(f"{tag}  - {value}\n")
            file.write("ER  -\n")
            if i < len(entries) - 1:  # Add blank line between entries, but not after the last
                file.write("\n")

def setup_logging():
    """Set up logging to a file."""
    logging.basicConfig(
        filename=f'../../data/log/deduplication_log_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}.txt',
        level=logging.INFO,
        format='%(asctime)s - %(message)s'
    )

def main(input_file, output_file):
    setup_logging()
    
    # Parse RIS file
    entries = parse_ris_file(input_file)
    
    # Deduplicate entries
    deduplicated_entries, log_messages = deduplicate_entries(entries)
    
    # Write to output file
    write_ris_file(deduplicated_entries, output_file)
    
    # Log results
    for message in log_messages:
        logging.info(message)
    
    logging.info(f"Processed {len(entries)} entries, kept {len(deduplicated_entries)} entries")
    print(f"Deduplication complete. Output written to {output_file}")
    print(f"Log written to deduplication_log_*.txt")

if __name__ == "__main__":
    input_file = '../../data/all/6415_arxiv_results_multi_1640_tad_tal_3890_tad_tal_scopus_20250509_886_tad_tal_wos_20250509_885.ris'
    output_file = '../../data/all/6415_arxiv_results_multi_1640_tad_tal_3890_tad_tal_scopus_20250509_886_tad_tal_wos_20250509_885_dep.ris'
    main(input_file, output_file)

Deduplication complete. Output written to ../../data/all/6415_arxiv_results_multi_1640_tad_tal_3890_tad_tal_scopus_20250509_886_tad_tal_wos_20250509_885_dep.ris
Log written to deduplication_log_*.txt
