读取 CSV 数据，确保日期格式正确。

按 cosit 分组，并找到每个 cosit 内的连续日期段。

只保留 2023-2025 年的连续日期数据。

在筛选后的数据集 subset_df 中添加 Tried 和 Retrieved 两列，初始值设为 0。

In [7]:
#引入库
import os
import time
import glob
import shutil
import random
import requests
import subprocess
import logging
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager

In [31]:
#print logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [None]:
import pandas as pd

# 加载CSV文件和Excel文件
ccs_cosit_date_sorted = pd.read_csv("ccs_cosit_date_sorted.csv")

# 加载Excel文件时明确指定 cosit 列的类型为字符串
cosit_and_id_for_ccs_dot = pd.read_excel("cosit_and_id_for_ccs_dot.xlsx", dtype={"cosit": str})

# 根据 'id' 列进行合并，将 'cosit' 列替换到 'ccs_cosit_date_sorted' 中
merged_data = pd.merge(ccs_cosit_date_sorted, cosit_and_id_for_ccs_dot[['id', 'cosit']], on='id', how='left')

# 替换合并后的数据中的 'cosit' 列
merged_data['cosit'] = merged_data['cosit_y']  # 用 'cosit_y' 列替换原来的 'cosit' 列

# 删除 'cosit_y' 列（合并时自动生成的）
merged_data = merged_data.drop(columns=['cosit_y'])
merged_data = merged_data.drop(columns=['cosit_x'])

# 确保 'cosit' 列保存为字符串格式
merged_data['cosit'] = merged_data['cosit'].astype(str)

# 打印合并后的数据（可选）
print(merged_data.head())

                                     id    day        date         cosit
0  573292FF-2F4B-4721-92BB-B2278A482D0F  13882  2008-01-04  000000010183
1  573292FF-2F4B-4721-92BB-B2278A482D0F  13883  2008-01-05  000000010183
2  573292FF-2F4B-4721-92BB-B2278A482D0F  13884  2008-01-06  000000010183
3  573292FF-2F4B-4721-92BB-B2278A482D0F  13885  2008-01-07  000000010183
4  573292FF-2F4B-4721-92BB-B2278A482D0F  13886  2008-01-08  000000010183


In [None]:
# 修改了聚合方式：按月聚合
import pandas as pd
import os

# 读取数据
df = merged_data

# 确保日期格式正确
df['date'] = pd.to_datetime(df['date'])

# 按 cosit 和年份-月份 分组
results = []

for (cosit, year_month), group in df.groupby(['cosit', df['date'].dt.to_period('M')]):
    start_date = group['date'].min()  # 该 cosit 在该月的最早日期
    end_date = group['date'].max()    # 该 cosit 在该月的最晚日期
    results.append([cosit, start_date, end_date])

# 生成 DataFrame
contiguous_df = pd.DataFrame(results, columns=['cosit', 'begin_date', 'end_date'])

# 过滤 2023-2025 年的数据
subset_df = contiguous_df[
    (contiguous_df['begin_date'].dt.year >= 2023) & (contiguous_df['begin_date'].dt.year <= 2025) & 
    (contiguous_df['end_date'].dt.year >= 2023) & (contiguous_df['end_date'].dt.year <= 2025)
].copy()

# 添加标记列
subset_df.loc[:, 'Tried'] = 0
subset_df.loc[:, 'Retrieved'] = 0

In [11]:
subset_df.to_excel("subset_ccs_cosit_date_sorted_by_month_exact.xlsx", index=False, engine='openpyxl')

## 从这里开始跑

通过已下载的文件名更新下载记录。

In [1]:
import pandas as pd
import os
import re

# 读取主Excel文件
excel_file = "subset_ccs_cosit_date_sorted_by_month_exact_feb261013am.xlsx"
df_main = pd.read_excel(excel_file, dtype={"cosit": str})

# 指定 NamedReports 文件夹路径
xls_folder = "./NamedReports/"

# 获取所有 xls 文件名
xls_files = [f for f in os.listdir(xls_folder) if f.endswith(".xls")]

# 提取 cosit、begin_date 和 end_date
data_records = []
pattern = r"(\d+)_([\d-]+)_([\d-]+)\.xls"

for filename in xls_files:
    match = re.match(pattern, filename)
    if match:
        cosit = match.group(1)
        begin_date = match.group(2)
        end_date = match.group(3)
        data_records.append({"cosit": cosit, "begin_date": begin_date, "end_date": end_date})

# 创建 DataFrame
df_downloaded = pd.DataFrame(data_records)

# 确保主 Excel 的数据格式一致
df_main["begin_date"] = df_main["begin_date"].astype(str)
df_main["end_date"] = df_main["end_date"].astype(str)
df_main["cosit"] = df_main["cosit"].astype(str)

# 检查 "Retrieved" 列是否存在
if "Retrieved" not in df_main.columns:
    raise ValueError("主 Excel 文件中不存在 'Retrieved' 列，请检查数据格式！")

# 更新 "Retrieved" 列
df_main["Retrieved"] = df_main.apply(
    lambda row: 1 if ((df_downloaded["cosit"] == row["cosit"]) &
                      (df_downloaded["begin_date"] == row["begin_date"]) &
                      (df_downloaded["end_date"] == row["end_date"])).any() else row["Retrieved"], axis=1
)

# 保存更新后的 Excel
updated_file = "./updated_excel.xlsx"
df_main.to_excel(updated_file, index=False)

print(f"更新后的文件已保存到: {updated_file}")

更新后的文件已保存到: ./updated_excel.xlsx


# Add Failed Data to Excel, Rescrapy

In [3]:
import pandas as pd
import re

# 读取数据
updated_excel = pd.read_excel("updated_excel.xlsx", dtype={'cosit': str})
processing_log = pd.read_excel("processing_log.xlsx")

# 解析 file_name，提取 cosit, begin_date, end_date
def extract_info(file_name):
    match = re.match(r"(\d+)_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.xls", file_name)
    if match:
        return match.groups()
    return None, None, None

# 解析 processing_log 的 cosit, begin_date, end_date
processing_log[['cosit', 'begin_date', 'end_date']] = processing_log['File_Name'].apply(lambda x: pd.Series(extract_info(x)))

# 确保 cosit 统一为字符串类型
processing_log['cosit'] = processing_log['cosit'].astype(str)
processing_log['begin_date'] = processing_log['begin_date'].astype(str)
processing_log['end_date'] = processing_log['end_date'].astype(str)

# 仅保留 `Status` 为 `Failed` 的记录
failed_logs = processing_log[processing_log['Status'] == 'Failed'][['cosit', 'begin_date', 'end_date']]

# 确保 updated_excel 的 cosit 也是字符串类型
updated_excel['cosit'] = updated_excel['cosit'].astype(str)
updated_excel['begin_date'] = updated_excel['begin_date'].astype(str)
updated_excel['end_date'] = updated_excel['end_date'].astype(str)

# 合并数据
merged = updated_excel.merge(failed_logs, on=['cosit', 'begin_date', 'end_date'], how='left', indicator=True)

# 将 `_merge` 列值为 'both' 的行的 `Tried` 和 `Retrieved` 设为 0
merged.loc[merged['_merge'] == 'both', ['Tried', 'Retrieved']] = 0

# 删除 `_merge` 列
merged.drop(columns=['_merge'], inplace=True)

# 保存更新后的 `updated_excel`
merged.to_excel("updated_excel_add_failed_data.xlsx", index=False)

# 新数据更新文件
主要修改点：
仅更新 Retrieved 和 Tried 列：

之前的代码只更新 Retrieved，现在 Tried 也同步更新。
只有当文件存在时，Retrieved 和 Tried 才设为 1，否则保持原值。
检查 Retrieved 和 Tried 是否存在：

如果这两列在 Excel 中不存在，则创建，并初始化为 0。
使用 apply 更新数据：

update_flags() 函数检查 df_downloaded 是否包含对应的 cosit、begin_date 和 end_date 组合，如果存在，则 Retrieved=1, Tried=1，否则保持原值。
运行结果：
代码执行后，updated_excel.xlsx 只会更新那些 成功下载的文件对应的行，不会影响其他数据。

In [None]:
import pandas as pd
import os
import re

# 读取主Excel文件
excel_file = "updated_excel.xlsx"
df_main = pd.read_excel(excel_file, dtype={"cosit": str})

# 指定 NamedReports 文件夹路径
xls_folder = "./NamedReports/"

# 获取所有 xls 文件名
xls_files = [f for f in os.listdir(xls_folder) if f.endswith(".xls")]

# 提取 cosit、begin_date 和 end_date
data_records = []
pattern = r"(\d+)_([\d-]+)_([\d-]+)\.xls"

for filename in xls_files:
    match = re.match(pattern, filename)
    if match:
        cosit = match.group(1)
        begin_date = match.group(2)
        end_date = match.group(3)
        data_records.append({"cosit": cosit, "begin_date": begin_date, "end_date": end_date})

# 创建 DataFrame
df_downloaded = pd.DataFrame(data_records)

# 确保主 Excel 的数据格式一致
df_main["begin_date"] = df_main["begin_date"].astype(str)
df_main["end_date"] = df_main["end_date"].astype(str)
df_main["cosit"] = df_main["cosit"].astype(str)

# 确保 "Retrieved" 和 "Tried" 列存在，否则创建
for col in ["Retrieved", "Tried"]:
    if col not in df_main.columns:
        df_main[col] = 0  # 如果列不存在，则创建并初始化为 0

# 更新 "Retrieved" 和 "Tried" 列（仅对已下载的文件更新，不影响其他数据）
def update_flags(row):
    if ((df_downloaded["cosit"] == row["cosit"]) &
        (df_downloaded["begin_date"] == row["begin_date"]) &
        (df_downloaded["end_date"] == row["end_date"])).any():
        return pd.Series({"Retrieved": 1, "Tried": 1})
    return pd.Series({"Retrieved": row["Retrieved"], "Tried": row["Tried"]})

# 应用更新逻辑
df_main[["Retrieved", "Tried"]] = df_main.apply(update_flags, axis=1)

# 保存更新后的 Excel
updated_file = "./updated_excel.xlsx"
df_main.to_excel(updated_file, index=False)

print(f"更新后的文件已保存到: {updated_file}")

更新后的文件已保存到: ./updated_excel.xlsx


In [1]:
#引入库
import os
import time
import glob
import shutil
import random
import requests
import subprocess
import logging
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
# 加载Excel文件时明确指定 cosit 列的类型为字符串
subset_df = pd.read_excel("./updated_excel.xlsx", dtype={"cosit": str})
subset_df.head()

Unnamed: 0,cosit,begin_date,end_date,Tried,Retrieved
0,10183,2023-01-01,2023-01-31,0,0
1,10183,2023-02-01,2023-02-28,0,0
2,10183,2023-03-01,2023-03-31,0,0
3,10183,2023-04-01,2023-04-30,0,0
4,10183,2023-05-01,2023-05-31,0,0


In [2]:
subset_df["begin_date"] = pd.to_datetime(subset_df["begin_date"], errors='coerce', format='%Y-%m-%d')
subset_df["end_date"] = pd.to_datetime(subset_df["end_date"], errors='coerce', format='%Y-%m-%d')

In [3]:
# 获取 Mullvad VPN 服务器列表
# 该函数从 mullvad relay list 命令中提取位于指定国家的 VPN 服务器，并将其解析为 pandas.DataFrame 结构。
# 增加了几个国家
def get_mullvad_servers():
    """Fetch and parse Mullvad VPN servers in specified countries into a DataFrame."""
    try:
        # Define the countries to include
        countries = ['us', 'ca', 'gb', 'de', 'jp']

        # Create a regex pattern for the selected countries
        country_pattern = '|'.join(countries)

        # Run the shell command to get valid servers
        command = f"mullvad relay list | awk '{{print $1}}' | grep -E '^({country_pattern})-[a-z0-9]+-[a-z0-9]+-[a-z0-9]+$'"
        result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

        if result.returncode != 0:
            print("Error fetching Mullvad relay list. Ensure Mullvad VPN is installed and running.")
            return pd.DataFrame(columns=["Country", "City Code"])

        # Process output into a list of (country, city) tuples
        servers = [line.strip() for line in result.stdout.split("\n") if line.strip()]
        parsed_servers = [(s.split("-")[0], s.split("-")[1]) for s in servers]

        # Create DataFrame
        df = pd.DataFrame(parsed_servers, columns=["Country", "City Code"])

        # Deduplicate rows
        df = df.drop_duplicates()

        return df

    except Exception as e:
        print(f"Error retrieving Mullvad server list: {e}")
        return pd.DataFrame(columns=["Country", "City Code"])

# 获取当前外网 IP, 获取当前设备的 外部 IP 地址。
def get_external_ip():
    """Gets the current external IP address using a system command."""
    try:
        #result = subprocess.run(["curl", "-s", "ifconfig.me"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        result = subprocess.run(["curl", "-s", "https://api.ipify.org"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        if result.returncode == 0:
            return result.stdout.strip()
        else:
            logging.error(f"Failed to retrieve external IP: {result.stderr}")
            return None
    except Exception as e:
        logging.error(f"Error while getting external IP: {e}")
        return None

# 切换到随机 Mullvad VPN 服务器，并确保 IP 变化
'''
随机选择一个服务器
切换 VPN 服务器
检查外网 IP 变化
最多尝试 10 次
'''
def change_mullvad_server(df_servers):
    """Connects Mullvad VPN to a random server and ensures a different IP."""
    if df_servers.empty:
        logging.error("No servers available to connect.")
        return

    old_ip = get_external_ip()
    if old_ip:
        logging.info(f"Current IP: {old_ip}")
    else:
        logging.warning("Could not retrieve current IP. Proceeding anyway.")

    attempts = 0
    new_ip = old_ip  # Initialize with old IP to enter the loop

    while new_ip == old_ip and attempts < 10:
        # Randomly select a server
        selected_server = df_servers.sample(n=1).iloc[0]
        country = selected_server["Country"]
        city = selected_server["City Code"]

        logging.info("Disconnecting Mullvad VPN...")
        subprocess.run(["mullvad", "disconnect"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        time.sleep(1)

        logging.info(f"Connecting to Mullvad VPN at {city} ({country})...")
        subprocess.run(["mullvad", "relay", "set", "location", country, city], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        subprocess.run(["mullvad", "connect"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

        # Confirm VPN connection
        status = subprocess.run(["mullvad", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        logging.info(f"VPN Status: {status.stdout.strip()}")

        # Check new external IP
        time.sleep(3)  # Wait for the connection to stabilize
        new_ip = get_external_ip()

        if new_ip and new_ip != old_ip:
            logging.info(f"Successfully changed IP: {new_ip}")
            return
        else:
            logging.warning(f"Attempt {attempts + 1}: IP did not change. Retrying...")

        attempts += 1

    logging.error("Max attempts reached. Unable to change IP.")

# 断开 Mullvad VPN
def disconnect_mullvad():
    """Disconnects Mullvad VPN."""
    logging.info("Disconnecting Mullvad VPN...")
    subprocess.run(["mullvad", "disconnect"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    time.sleep(1)

In [4]:
import time

In [None]:
'''
配置 Selenium WebDriver

初始化 Selenium WebDriver
配置浏览器下载路径
设定浏览器选项，减少被检测为自动化程序的可能性
'''
def setup_selenium_driver(download_dir):
    """Initializes Selenium WebDriver with correct download settings."""
    download_dir = os.path.abspath(download_dir)
    os.makedirs(download_dir, exist_ok=True)

    options = Options()
    options.add_argument("--disable-blink-features=AutomationControlled")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    #options.add_argument("--start-maximized")

    prefs = {
        "download.default_directory": download_dir,
        "download.prompt_for_download": False,
        "download.directory_upgrade": True,
        "safebrowsing.enabled": True,
        "safebrowsing.disable_download_protection": True
    }
    options.add_experimental_option("prefs", prefs)

    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=options)
    logging.info(f"Selenium WebDriver launched with download directory: {download_dir}")
    return driver

# 下载和重命名报告
'''
使用 Selenium 访问指定网站
下载 Excel 文件
检测下载是否完成
重命名文件并移动到 NamedReports 目录
'''
def download_and_rename_reports(batch_rows):
    """Sequentially downloads reports, renames, and moves them to NamedReports."""
    download_dir = os.path.abspath("Reports")
    named_reports_dir = os.path.abspath("NamedReports")
    os.makedirs(download_dir, exist_ok=True)
    os.makedirs(named_reports_dir, exist_ok=True)

    driver = setup_selenium_driver(download_dir)

    for _, row in batch_rows.iterrows():
        cosit = row['cosit']
        begin_date = row['begin_date'].strftime('%Y-%m-%d')
        end_date = row['end_date'].strftime('%Y-%m-%d')

        url = f"https://gdottrafficdata.drakewell.com/tfdaysreport.asp?node=GDOT_CCS&cosit={cosit}&reportdate={begin_date}&enddate={end_date}&intval=4&dir=%-4&excel=1"
        logging.info(f"Opening URL: {url}")
        driver.get(url)

        start_time = time.time()
        downloaded_file = None

        while True:
            xls_files = glob.glob(os.path.join(download_dir, "*.xls"))
            if xls_files:
                downloaded_file = max(xls_files, key=os.path.getctime)
                break
            if time.time() - start_time > 30:
                logging.warning(f"Timeout reached for {url}. Skipping to next.")
                # 增加等待时间
                # time.sleep(600)  # Pause for 10 minute
                break
            time.sleep(1)

        if downloaded_file:
            new_filename = f"{cosit}_{begin_date}_{end_date}.xls"
            shutil.move(downloaded_file, os.path.join(named_reports_dir, new_filename))
            logging.info(f"Downloaded and renamed: {new_filename}")

    driver.quit()
    logging.info("Driver closed.")

# 批量爬取数据并切换 VPN
'''
检查 NamedReports 是否已有文件
筛选要爬取的 batch_number 条数据
使用 Mullvad VPN 切换 IP
调用 download_and_rename_reports() 进行下载
更新爬取状态
断开 VPN
'''
def scrape_batch_with_vpn(batch_number):
    """Scrapes a batch of data, checking NamedReports first, then proceeding with the scraping process."""
    global subset_df

    named_reports_dir = os.path.abspath("NamedReports")

    # Step 1: Check NamedReports and update subset_df
    logging.info("Checking NamedReports for already downloaded files...")

    for idx, row in subset_df.iterrows():
        expected_filename = f"{row['cosit']}_{row['begin_date'].strftime('%Y-%m-%d')}_{row['end_date'].strftime('%Y-%m-%d')}.xls"
        if os.path.exists(os.path.join(named_reports_dir, expected_filename)):
            subset_df.loc[idx, ['Tried', 'Retrieved']] = 1  # Mark as already scraped

    logging.info("Finished checking NamedReports. Now selecting batch for scraping.")

    # Step 2: Main scraping loop
    while True:
        # Select batch where Tried = 0 first; if none, select where Tried = 1 and Retrieved = 0
        available_rows = subset_df[subset_df['Tried'] == 0]
        if available_rows.empty:
            available_rows = subset_df[(subset_df['Tried'] == 1) & (subset_df['Retrieved'] == 0)]
        
        if available_rows.empty:
            logging.info("No more files left to try. Terminating loop.")
            break

        # Randomly select batch_number rows
        batch_rows = available_rows.sample(n=min(batch_number, len(available_rows)), random_state=42)

        # Mark selected rows as Tried
        subset_df.loc[batch_rows.index, 'Tried'] = 1

        # Change VPN to a new random location
        change_mullvad_server(get_mullvad_servers())

        # Download reports
        download_and_rename_reports(batch_rows)

        # Step 3: Check downloaded files and update Retrieved column
        for idx, row in batch_rows.iterrows():
            expected_filename = f"{row['cosit']}_{row['begin_date'].strftime('%Y-%m-%d')}_{row['end_date'].strftime('%Y-%m-%d')}.xls"
            if os.path.exists(os.path.join(named_reports_dir, expected_filename)):
                subset_df.loc[idx, 'Retrieved'] = 1

        # Disconnect VPN
        disconnect_mullvad()

        # Pause for a random time between 1 and 2 seconds
        # 避免vpn节点耗尽 否则10分钟耗尽一次
        # time.sleep(random.uniform(10, 30))

In [11]:
scrape_batch_with_vpn(10)

In [9]:
subset_df.to_excel("updated_excel.xlsx", index=False, engine='openpyxl')