In [5]:
import os
import zipfile
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import requests
from urllib.parse import urlparse
from tqdm import tqdm

# ==================== 配置区 ====================
MASTER_FILELIST_URL = "http://data.gdeltproject.org/gdeltv2/masterfilelist.txt"
DOWNLOAD_DIR = "gdelt_zips"
EXTRACT_DIR = "gdelt_temp_csvs"
OUTPUT_CSV = "selected_columns.csv"
MAX_DOWNLOADS = 15000  # 0表示无限制
MAX_WORKERS = 6    # 并发线程数
RETRY_ATTEMPTS = 1 # 下载失败重试次数
CLEANUP_ZIPS = True
CLEANUP_CSVS = True

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ================================================

def setup_dirs():
    """创建必要目录"""
    os.makedirs(DOWNLOAD_DIR, exist_ok=True)
    os.makedirs(EXTRACT_DIR, exist_ok=True)

def get_export_urls():
    """智能获取符合时间条件的export文件URL列表"""
    logger.info("Fetching master file list...")
    try:
        response = requests.get(MASTER_FILELIST_URL, timeout=30)
        response.raise_for_status()
        
        filtered_urls = []
        for line in response.text.splitlines():
            parts = line.split()
            if len(parts) < 3:
                continue
            url = parts[2]
            
            # 文件名解析验证
            if not url.endswith('.export.CSV.zip'):
                continue
                
            try:
                filename = url.split("/")[-1]
                timestamp_str = filename.split('.')[0]
                if len(timestamp_str) != 14:
                    continue
                
                # 精确时间要素提取
                year = int(timestamp_str[:4])
                month = int(timestamp_str[4:6])
                day = int(timestamp_str[6:8])
                hour = int(timestamp_str[8:10])
                minutes = int(timestamp_str[10:12])
            except:
                continue  # 跳过格式错误文件

            # 时间筛选逻辑
            time_conditions = [
                2018 <= year <= 2024,  # 年份范围
                day in [1, 15],        # 每月1号/15号
                hour in [6, 18],       # 早晚6点
                minutes == 0           # 整点数据
            ]
            
            if all(time_conditions):
                filtered_urls.append(url)
                
        return filtered_urls[:MAX_DOWNLOADS or None]
    except Exception as e:
        logger.error(f"获取文件列表失败: {str(e)}")
        return []

def download_with_retry(url):
    """智能重试下载机制"""
    for attempt in range(RETRY_ATTEMPTS + 1):
        try:
            return download_zip(url)
        except Exception as e:
            if attempt == RETRY_ATTEMPTS:
                logger.error(f"最终下载失败: {url}")
                return None
            logger.warning(f"第{attempt+1}次重试: {url}")

def download_zip(url):
    """带进度条的分块下载"""
    filename = os.path.basename(urlparse(url).path)
    save_path = os.path.join(DOWNLOAD_DIR, filename)
    
    if os.path.exists(save_path):
        logger.info(f"文件已存在: {filename}")
        return save_path
    
    try:
        with requests.get(url, stream=True, timeout=60) as r:
            r.raise_for_status()
            total_size = int(r.headers.get('content-length', 0))
            
            with open(save_path, 'wb') as f, tqdm(
                desc=filename,
                total=total_size,
                unit='B',
                unit_scale=True,
                unit_divisor=1024,
            ) as pbar:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
                    pbar.update(len(chunk))
                    
            return save_path
    except Exception as e:
        logger.error(f"下载失败: {filename} - {str(e)}")
        if os.path.exists(save_path):
            os.remove(save_path)
        raise

def process_csv(csv_path):
    """高效CSV处理（已通过文件名预筛选）"""
    try:
        chunks = []
        for chunk in pd.read_csv(
            csv_path,
            sep='\t',
            header=None,
            dtype=str,
            na_filter=False,
            chunksize=10000,
            encoding='utf-8',
            on_bad_lines='warn'
        ):
            # 直接提取目标列（已通过文件名确保时间范围）
            selected_columns = chunk.iloc[:, [4, 26, -1]]
            chunks.append(selected_columns)
        
        return pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame()
    except Exception as e:
        logger.error(f"CSV处理失败: {csv_path} - {str(e)}")
        return pd.DataFrame()

def process_zip(zip_path):
    """ZIP文件处理流水线"""
    try:
        dfs = []
        with zipfile.ZipFile(zip_path, 'r') as zf:
            for file_in_zip in zf.namelist():
                if file_in_zip.endswith('.export.CSV'):
                    csv_name = os.path.basename(file_in_zip)
                    csv_path = os.path.join(EXTRACT_DIR, csv_name)
                    
                    if not os.path.exists(csv_path):
                        zf.extract(file_in_zip, EXTRACT_DIR)
                    
                    df = process_csv(csv_path)
                    if not df.empty:
                        dfs.append(df)
                    
                    if CLEANUP_CSVS and os.path.exists(csv_path):
                        os.remove(csv_path)
        
        return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
    except Exception as e:
        logger.error(f"ZIP处理失败: {zip_path} - {str(e)}")
        return pd.DataFrame()

def main():
    setup_dirs()
    urls = get_export_urls()
    
    if not urls:
        logger.error("未找到有效下载地址")
        return
    
    logger.info(f"开始处理 {len(urls)} 个文件，使用 {MAX_WORKERS} 个线程...")
    
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_url = {executor.submit(download_with_retry, url): url for url in urls}
        
        all_data = []
        for future in as_completed(future_to_url):
            zip_path = future.result()
            if zip_path:
                df = process_zip(zip_path)
                if not df.empty:
                    all_data.append(df)
                    logger.info(f"处理完成: {os.path.basename(zip_path)} - 事件数: {len(df)}")
                
                if CLEANUP_ZIPS and os.path.exists(zip_path):
                    os.remove(zip_path)
    
    if all_data:
        final_df = pd.concat(all_data, ignore_index=True)
        final_df.columns = ['Time', 'EventRootCode', 'SOURCEURL']
        
        # 精准去重
        before = len(final_df)
        final_df = final_df.drop_duplicates()
        logger.info(f"去重完成: 移除 {before - len(final_df)} 条重复记录")
        
        final_df.to_csv(OUTPUT_CSV, index=False)
        logger.info(f"★ 处理完成！最终事件数: {len(final_df)}")
        logger.info(f"输出文件路径: {os.path.abspath(OUTPUT_CSV)}")
    else:
        logger.warning("⚠ 未找到有效事件数据")

if __name__ == "__main__":
    main()

2025-03-17 00:43:06,742 - INFO - Fetching master file list...
2025-03-17 00:43:20,110 - INFO - 开始处理 324 个文件，使用 6 个线程...
20180101060000.export.CSV.zip: 100%|██████████| 46.4k/46.4k [00:00<00:00, 2.58MB/s]
2025-03-17 00:43:20,480 - INFO - 处理完成: 20180101060000.export.CSV.zip - 事件数: 777
20180101180000.export.CSV.zip: 100%|██████████| 48.9k/48.9k [00:00<00:00, 1.94MB/s]
2025-03-17 00:43:20,651 - INFO - 处理完成: 20180101180000.export.CSV.zip - 事件数: 749
20180115180000.export.CSV.zip: 100%|██████████| 130k/130k [00:00<00:00, 3.34MB/s]
2025-03-17 00:43:20,725 - INFO - 处理完成: 20180115180000.export.CSV.zip - 事件数: 1942
20180301060000.export.CSV.zip: 100%|██████████| 114k/114k [00:00<00:00, 2.24MB/s]
20180301180000.export.CSV.zip:   0%|          | 0.00/177k [00:00<?, ?B/s]2025-03-17 00:43:20,861 - INFO - 处理完成: 20180301060000.export.CSV.zip - 事件数: 1677

20180301180000.export.CSV.zip: 100%|██████████| 177k/177k [00:00<00:00, 2.67MB/s]
20180315060000.export.CSV.zip: 100%|██████████| 105k/105k [00:00<00:00

In [6]:
import csv

# 输入文件路径
input_file = 'selected_columns.csv'
# 输出文件路径
output_file = 'filtered_events.csv'

# 需要筛选的EventRootCode前两位范围
political_codes = {'01', '02', '03', '04', '05', '06', '14', '17', '18', '19'} 
economic_codes = {'07', '08', '09', '10', '11', '12', '13', '15', '16'}

with open(input_file, 'r', newline='', encoding='utf-8') as infile, \
     open(output_file, 'w', newline='', encoding='utf-8') as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)
    
    # 写入标题行
    header = next(reader)
    writer.writerow(header)
    
    # 遍历每一行
    for row in reader:
        event_root_code = row[1]  # EventRootCode在第二列
        # 检查前两位是否在政治或经济代码范围内
        if len(event_root_code) >= 2:
            prefix = event_root_code[:2]                               
            if prefix in political_codes or prefix in economic_codes:
                writer.writerow(row)

print(f"筛选完成，结果已保存到 {output_file}")

筛选完成，结果已保存到 filtered_events.csv


In [None]:
import csv
import hashlib
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import requests
from bs4 import BeautifulSoup
import trafilatura
from simhash import Simhash
import threading

# ===== 配置区 =====
INPUT_CSV = 'filtered_events.csv'     # 输入文件路径
OUTPUT_CSV = 'cleaned_dataset.csv'    # 输出文件路径
MAX_WORKERS = 6                       # 并发线程数
REQUEST_TIMEOUT = 15                  # 请求超时时间（秒）
MIN_CONTENT_LENGTH = 150              # 最小有效内容长度

# ===== 去重系统 =====
class ContentDeduplicator:
    def __init__(self):
        self.seen_hashes = set()      # 存储已见过内容的哈希值
        self.simhashes = []           # 存储Simhash值
        self.lock = threading.Lock()  # 线程锁

    def is_duplicate(self, content):
        content_hash = hashlib.md5(content.encode()).hexdigest()
        
        with self.lock:
            if content_hash in self.seen_hashes:
                return True

        words = re.findall(r'\w+', content.lower())
        simhash = Simhash(words)

        with self.lock:
            if content_hash in self.seen_hashes:
                return True
            for existing in self.simhashes:
                if simhash.distance(existing) <= 3:
                    return True
            self.seen_hashes.add(content_hash)
            self.simhashes.append(simhash)
            return False

# ===== 核心引擎 =====
class WebScraperEngine:
    def __init__(self):
        self.session = self._create_session()
        self.deduplicator = ContentDeduplicator()

    def _create_session(self):
        session = requests.Session()
        retry = Retry(
            total=3,
            backoff_factor=0.5,
            status_forcelist=[500, 502, 503, 504]
        )
        adapter = HTTPAdapter(max_retries=retry)
        session.mount('http://', adapter)
        session.mount('https://', adapter)
        return session

    def _extract_content(self, html):
        try:
            content = trafilatura.extract(
                html,
                include_comments=False,
                include_tables=False,
                no_fallback=True
            )
            if content and len(content) > 500:
                return content
        except:
            pass
        
        try:
            soup = BeautifulSoup(html, 'lxml')
            selectors = [
                'article', 
                'div.article-body',
                'div.main-content',
                'div.story-content',
                'div.post-content'
            ]
            for selector in selectors:
                if (element := soup.select_one(selector)):
                    return element.get_text()
            return soup.get_text()
        except:
            return ""

    def _clean_content(self, text):
        if not text:
            return ""
        
        text = re.sub(r'[\n\r\t]', ' ', text)
        text = re.sub(r'\s{2,}', ' ', text)
        
        filters = [
            r'^.*?(?=\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\b)',
            r'\b(?:Follow|Share|Comments?|Sign up|Subscribe|Related|Read more)\b.*$',
            r'[\x00-\x1F\x7F-\x9F]',
            r'[^\w\s.,!?%$&/-]'
        ]
        
        for pattern in filters:
            text = re.sub(pattern, '', text, flags=re.IGNORECASE)
        
        return text.strip()[:10000]

    def process_url(self, time, url):
        try:
            response = self.session.get(
                url,
                headers={'User-Agent': 'Mozilla/5.0'},
                timeout=REQUEST_TIMEOUT
            )
            response.raise_for_status()
            
            raw_content = self._extract_content(response.text)
            cleaned_content = self._clean_content(raw_content)
            
            if len(cleaned_content) < MIN_CONTENT_LENGTH:
                return None
            if self.deduplicator.is_duplicate(cleaned_content):
                return None
                
            return time, cleaned_content
        except Exception:
            return None

# ===== 主程序 =====
def main():
    engine = WebScraperEngine()
    
    with open(INPUT_CSV, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        tasks = [(row['Time'], row['SOURCEURL']) for row in reader]

    with open(OUTPUT_CSV, 'w', newline='', encoding='utf-8') as f_out:
        writer = csv.writer(f_out)
        writer.writerow(['Time', 'Content'])
        
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            futures = {executor.submit(engine.process_url, t, u): (t, u) for t, u in tasks}
            
            processed = 0
            total = len(tasks)
            for future in as_completed(futures):
                result = future.result()
                if result:
                    writer.writerow(result)
                    processed += 1
                print(f'\r处理进度: {processed}/{total} ({processed/total:.1%})', end='')

if __name__ == "__main__":
    start_time = time.time()
    main()
    print(f"\n处理完成！总耗时: {time.time() - start_time:.1f}秒")

处理进度: 3/91113 (0.0%)