# Step1: 查询文件编码
* 在search_term.xlsx中替换自己的检索词  
* 设置自己的检索时间窗口，注意2月的28天
* 替换自己的登录链接
* 在search builder界面后再输入回车
* 最好不要一次搜索超过100页，即1million 条结果，factiva会设置随机的翻页上限

In [None]:
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException
import time
import openpyxl


def check_search_results():
    try:
        WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.ID, 'headlines'))
        ) 
        
        no_results_message = driver.find_elements(By.XPATH, "//div[@id='headlines' and contains(text(), 'No search results')]")
        if no_results_message:
            return 0
        
        result_bar = driver.find_element(By.CLASS_NAME, 'resultsBar')
        result_text = result_bar.text
        total_results = int(result_text.split('of ')[1].split()[0].replace(',', ''))
        return total_results
    
    except Exception as e:
        print(f"检查搜索结果时出错: {e}")
        return -1

def extract_data():
    try:
        WebDriverWait(driver, 1).until(
            EC.presence_of_element_located((By.ID, 'headlines'))
        )
        
        headline_trs = driver.find_elements(By.CSS_SELECTOR, '#headlines > table > tbody > tr.headline')
        
        data = []
        
        for tr in headline_trs:
            main_accno = tr.get_attribute('data-accno')
            if main_accno:
                data.append((main_accno, ''))
                
                duplicate_container = tr.find_elements(By.CSS_SELECTOR, 'div[id$="_dedup"]')
                if duplicate_container:

                    duplicate_trs = duplicate_container[0].find_elements(By.CSS_SELECTOR, 'tr.headline')
                    for dup_tr in duplicate_trs:
                        dup_accno = dup_tr.get_attribute('data-accno')
                        if dup_accno:
                            data.append((main_accno, dup_accno))
        
        return data
    
    except Exception as e:
        print(f"提取数据时出错: {e},重试")
        time.sleep(5)
        driver.refresh()
        return []

def click_next_page():
    max_attempts = 2
    for attempt in range(max_attempts):
        try:
            next_button = WebDriverWait(driver, 2).until(
                EC.element_to_be_clickable((By.CSS_SELECTOR, 'a.nextItem'))
            )
            driver.execute_script("arguments[0].scrollIntoView(true);", next_button)
            driver.execute_script("arguments[0].click();", next_button)
            time.sleep(1)
            return True
        
        except StaleElementReferenceException:
            if attempt < max_attempts - 1:
                print(f"StaleElementReferenceException，重试第 {attempt + 2} 次")
                time.sleep(1)
            else:
                print("多次尝试后仍无法点击下一页按钮")
                return False
            
        except (NoSuchElementException, TimeoutException):
            print("没有下一页或无法点击下一页按钮")
            return False

def start_next_query():
    try:       
        modify_search_button = WebDriverWait(driver, 2).until(
            EC.element_to_be_clickable((By.XPATH, '//*[@id="btnModifySearch"]/div/span'))
        )

        modify_search_button.click()
     
    except Exception as e:
        print(f"开始新搜索时出错: {e}")

def set_date_range(start_day, start_month, start_year, end_day, end_month, end_year):
    try:
        date_range_select = WebDriverWait(driver, 5).until(
            EC.presence_of_element_located((By.NAME, 'dr'))
        )
        Select(date_range_select).select_by_value('Custom')

        start_day_input = driver.find_element(By.NAME, 'frd')
        start_day_input.clear()
        start_day_input.send_keys(start_day)

        start_month_input = driver.find_element(By.NAME, 'frm')
        start_month_input.clear()
        start_month_input.send_keys(start_month)

        start_year_input = driver.find_element(By.NAME, 'fry')
        start_year_input.clear()
        start_year_input.send_keys(start_year)

        time.sleep(1) 

        end_day_input = driver.find_element(By.NAME, 'tod')
        end_day_input.clear()
        end_day_input.send_keys(end_day)

        end_month_input = driver.find_element(By.NAME, 'tom')
        end_month_input.clear()
        end_month_input.send_keys(end_month)

        end_year_input = driver.find_element(By.NAME, 'toy')
        end_year_input.clear()
        end_year_input.send_keys(end_year)

        print(f"日期范围设置为 {start_day}/{start_month}/{start_year} - {end_day}/{end_month}/{end_year}")
    except Exception as e:
        print(f"设置日期范围时出错: {e}")

# 更换成自己的文件路径
input_file_path = "search_term.xlsx"
df = pd.read_excel(input_file_path)
output_file_path = "search_results_with_duplicates.xlsx"

driver_path = 'path/to/chromedriver'  # 替换为你的 ChromeDriver 路径
driver = webdriver.Chrome( )

# 更换为自己的网页
driver.get('https://libguides')

# 等待用户登录
input("请打开'factiva 主页>搜索>搜索建设'页面后，按Enter键继续...")

# 设置开始行
start_row = 0  
start_row = start_row -1 

output_data = []
output_df = pd.DataFrame(columns=[ 'search_term', 'start_day', 'start_month', 'start_year', 'total_results', 'extracted_accnos_count', 'accno', 'accno_duplicate', 'note'])

if start_row == 0:
    output_df = pd.DataFrame(columns=[ 'search_term', 'start_day', 'start_month', 'start_year', 'total_results', 'extracted_accnos_count', 'accnos', 'accno_duplicate','note'])
else:
    # 如果不是从第一行开始，尝试加载现有的输出文件
    output_file_path = output_file_path
    try:
        output_df = pd.read_excel(output_file_path)
    except FileNotFoundError:
        print("未找到现有的输出文件，将创建新文件。")
        output_df = pd.DataFrame(columns=['search_term', 'start_day', 'start_month', 'start_year', 'total_results', 'extracted_accnos_count', 'accnos', 'accno_duplicate', 'note'])

for index, row in df.iterrows():
    print(f"\n正在处理第 {index + 1} 行，共 {len(df)} 行")

    query = row['SearchTerms']
    print("* 检索内容:", query)

    start_day = row['StartDay']
    start_month = row['StartMonth']
    start_year = row['StartYear']

    end_day = row['EndDay']
    end_month = row['EndMonth']
    end_year = row['EndMYear']

    print(f"# 检索时间:\n{start_day}/{start_month}/{start_year} to {end_day}/{end_month}/{end_year}")

    note = ""
    total_results = 0
    all_data_accnos = []

    try:
        if index == 0:
            
            set_date_range(start_day, start_month, start_year, end_day, end_month, end_year)

            # 对于第一个查询，需要在搜索框中输入
            search_box = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.CLASS_NAME, 'ace_text-input'))
            )
            search_box.clear()
            query = [query[:-1] if query.endswith(']') else query  ] #替换]]

            print("* 输入检索内容://///////", query)
            search_box.send_keys(query)

            search_box.send_keys(Keys.RETURN) 

        else: 

            modify_search_button = WebDriverWait(driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//*[@id="btnModifySearch"]/div/span'))
            )

            modify_search_button.click()

            time.sleep(0.5)        

            search_box = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.CLASS_NAME, 'ace_text-input'))
            )
            print("找到输入框")

            time.sleep(1) 
            set_date_range(start_day, start_month, start_year, end_day, end_month, end_year)
            print("时间输入完成")

            WebDriverWait(driver, 60).until(
                EC.presence_of_element_located((By.ID, "editor"))
            )

            clear_and_set_content_js = """
            var editor = ace.edit("editor");
            editor.setValue("");
            editor.insert(arguments[0]);
            """
            driver.execute_script(clear_and_set_content_js, query)

            # 等待...确保内容被设置
            time.sleep(1)

            search_box = WebDriverWait(driver, 1).until(
                EC.presence_of_element_located((By.CLASS_NAME, 'ace_text-input'))
            )
            search_box.send_keys(Keys.RETURN)

        time.sleep(10)  

        total_results = check_search_results()

        if total_results == 0:
            print(f"'{query}'没有搜索结果")
            note = "No search results"
            new_rows = pd.DataFrame([{
                'search_term': query,
                'search_win':start_year,
                'total_results': total_results,
                'extracted_accnos_count': 0,
                'accno': '',
                'accno_duplicate':'',
                'note': note
            }])

        elif total_results > 0:
            print(f"'{query}'搜索结果总数: {total_results}")

            all_data = []
            page_number = 1

            while True:
                print(f"正在处理第 {page_number} 页")
                time.sleep(20)

                page_data = extract_data()
                all_data.extend(page_data)
                    
                print(f"第 {page_number} 页提取的data-accno数量: {len(page_data)}")
                
                if not click_next_page():
                    break
                
                page_number += 1

            print(f"\n'{query}'搜索提取的所有data-accno值:")

            new_rows = []
            for main_accno, duplicate_accno in all_data:
                new_rows.append({
                    'search_term': query,
                    'search_win':start_year,
                    'total_results': total_results,
                    'extracted_accnos_count': len(all_data),
                    'accno': main_accno,
                    'accno_duplicate': duplicate_accno,
                    'note': note
                })

            new_rows = pd.DataFrame(new_rows)

            print(f"\n'{query}'搜索总共提取的data-accno数量: {len(all_data_accnos)}")
            

        else:
            print(f"'{query}'检查搜索结果时出错")
            note = "Error checking search results"
            new_rows = pd.DataFrame([{
                'search_term': query,
                'search_win':start_year,
                'total_results': total_results,
                'extracted_accnos_count': 0,
                'accno': '',
                'accno_duplicate':'',
                'note': note
            }])

    except Exception as e:
        print(f"处理查询时发生错误: {e}")
        note = f"Error: {str(e)}"
        new_rows = pd.DataFrame([{
            'search_term': query,
            'total_results': total_results,
            'extracted_accnos_count': len(all_data_accnos),
            'accno': '',
            'note': note
        }])
    
    new_rows = pd.DataFrame(new_rows)

    output_df = pd.concat([output_df, new_rows], ignore_index=True)

    output_df.to_excel(output_file_path, index=False)
    print(f"已将第 {index + 1} 行的结果保存到Excel文件")

    if index != len(df) - 1:
        time.sleep(5)

driver.quit()


# step2: 生成文件链接
* 替换url链接中自己账号分享链接（任意在factiva中打开一篇文章，点击: 分享-同一账号内分享，即可得到自己的分享链接，然后把分享链接分段替换至代码中的base_url部分）  
（如文件编号为DJDN000020210102eh120000k，分享链接：https://global.factiva.com/redir/default.aspx?P=sa&NS=16&AID=9AUS001400&an=DJDN000020210102eh120000k&drn=drn%3aarchive.newsarticle.DJDN000020210102eh120000k&cat=a&ep=ASI， 替换除DJDN000020210102eh120000k以外部分）
* 使用上一步生成的文件，替换路径


In [None]:
import pandas as pd

# 读取上一步生成的文件
file_path = "search_results_with_duplicates.xlsx"
df = pd.read_excel(file_path)

df['accno_filled'] = df['accno_duplicate'].fillna(df['accno'])

# 生成 URL 列
base_url = "https://global.factiva.com/redir/default.aspx?P=sa&NS=16&AID=9AUS001400&an="
df['url'] = df['accno_filled'].apply(
    lambda x: f"{base_url}{x}&drn=drn%3aarchive.newsarticle.{x}&cat=a&ep=ASI"
) #替换成自己的url

# 保存到新的Excel文件
output_path = "search_results_with_urls.xlsx"
df.to_excel(output_path, index=False)

print(f"文件已保存到 {output_path}")


文件已保存到 E:\factiva_news\factiva code\search_results_with_urls.xlsx


# Step3: 下载并保存新闻为csv文件
* 替换网页链接为你自己的登录页面
* 输入开始行
* 使用上一步生成的链接文件
* 设置开始行数

In [None]:
import pandas as pd
import os
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

driver = webdriver.Chrome()

# 替换自己的网页
driver.get('https://libguides')

# 等待用户登录
input("请完成登录后按Enter键继续...")

# 上一步生成的链接文件
file_path = "search_results_with_duplicates_url_2010-2012.xlsx"

# 输出文件位置
output_file = "search_results_with_urls.xlsx"

# 从特定行开始
start_row = input(f"总共有 {len(df)} 行数据。请输入开始行号（从0开始，默认0）: ")

df = pd.read_excel(file_path)

columns = ['accno', 'site', 'title', 'wc','sn','pub','pd', 'full_text', 'note']
results_df = pd.DataFrame(columns=columns)

if not os.path.exists(output_file):
    results_df.to_csv(output_file, index=False)

results_df.to_csv(output_file, mode='a', index=False, header=False)

try:
    start_row = int(start_row)
    if start_row < 0 or start_row >= len(df):
        start_row = 0
except ValueError:
    start_row = 0

print(f"将从第 {start_row} 行开始处理")

for index, row in df.iterrows():
    if index < start_row:
        continue
        
    print(f"\n正在处理第 {index + 1}/{len(df)} 行")
    
    accno = row['accno']
    site = row['url']
    url = site  

    attempts = 0  # 尝试次数
    max_attempts = 6  # 最大尝试次数
    result = None

    while attempts < max_attempts:
        try:
            driver.get(url)

            display_options = WebDriverWait(driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//*[@id="ViewTab"]/a'))
            )
            display_options.click()

            full_article = WebDriverWait(driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, "//a[contains(text(), 'Full Article/Report plus Indexing')]"))
            )
            full_article.click()

            title_element = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.XPATH, "//span[@class='enHeadline']"))
            )
            title_text = title_element.text
            print("标题", title_text)

            wc_element = driver.find_element(By.XPATH, "//td[b='WC']/following-sibling::td")
            wc_text = wc_element.text

            sn_element = driver.find_element(By.XPATH, "//td[b='SN']/following-sibling::td")
            sn_text = sn_element.text

            pub_element = driver.find_element(By.XPATH, "//td[b='PUB']/following-sibling::td")
            pub_text = pub_element.text

            pd_element = driver.find_element(By.XPATH, "//td[b='PD']/following-sibling::td")
            pd_text = pd_element.text
            print("日期", pd_text)

            elements = driver.find_elements(By.CLASS_NAME, 'articleParagraph')
            full_text = ''

            for element in elements:
                full_text += element.text + '.'  # 使用句点分隔每段文本
            print("正文获取成功")

            result = {
                'accno': accno,
                'site': site,
                'title': title_text,
                'wc': wc_text,
                'pub': pub_text,
                'sn': sn_text,
                'pd': pd_text,
                'full_text': full_text,
                'note': ''
            }
            print("添加到results")
            break 

        except Exception as e:
            attempts += 1
            print(f"{url}这篇新闻第 {attempts} 次打开失败")

            if attempts == 1:
                # 第一次失败，等待10秒后重试
                print("等待5秒后重新打开URL")
                time.sleep(5)
            elif attempts == 2:
                # 第二次失败，等待60秒后重试
                print("等待10秒后重新打开URL")
                time.sleep(10)
            elif attempts == 3:
                # 第二次失败，等待60秒后重试
                print("等待60秒后重新打开URL")
                time.sleep(60)
            elif attempts == 4:
                # 第二次失败，等待60秒后重试
                print("等待120秒后重新打开URL")
                time.sleep(120)
            elif attempts == 5:
                # 第三次失败，记录错误信息
                print("最后一次尝试失败")
                note = f"{url}这篇新闻，在{max_attempts}次尝试后依然失败: {str(e)}"
                result = {
                    'accno': accno,
                    'site': site,
                    'title': '',
                    'wc': '',
                    'pub': '',
                    'sn': '',
                    'pd': '',
                    'full_text': '',
                    'note': note
                }

    result_df = pd.DataFrame([result])
    result_df.to_csv(output_file, mode='a', header=False, index=False)
    print(f"第 {index + 1}/{len(df)} 行数据已写入文件")
    time.sleep(1)

driver.quit()