In [None]:
import requests
from bs4 import BeautifulSoup
import time
import random
import json
import os
import re
from datetime import datetime, timedelta
import threading
import queue
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
import imessage
import subprocess

# random user agent list
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
    "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Mobile/15E148 Safari/604.1",
]

# input and output directory
INPUT_FILE = "articles_links/20190105.txt"
OUTPUT_DIR = "reuters_articles/20190105"
LOG_FILE = "reuters_crawl_log.txt"
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

def log_error(message):
    '''
        Record error message
    '''
    with open(LOG_FILE, "a", encoding="utf-8") as f:
        f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {message}\n")

def read_links(file_path):
    '''
        Read links from last step.
    '''
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            links = [line.strip() for line in f if line.strip()]
        return list(set(links))
    except FileNotFoundError:
        print(f"File not found: {file_path}")
        log_error(f"File not found: {file_path}")
        return []

def check_google():
    try:
        # Send get request to google main page
        response = requests.get("https://www.google.com", timeout=5)
        # check status code = 200
        return response.status_code == 200
    except requests.RequestException:
        # return false if no response
        return False

def send_switch_ip_msg():
    # send iMessage to iPhone
    imessage.send(['huiyang.han@gmail.com'], 'Hi Harry!')
    print('Sent')
    time.sleep(20)
    while not check_google():
        time.sleep(5)

def fetch_page(url, redirect=False):
    '''
        Get article page content by url.
    '''
    headers = {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Referer": "https://www.google.com/",
        "Connection": "keep-alive",
    }
    try:
        response = requests.get(url, headers=headers, timeout=15, allow_redirects=True)
        #print(response.text)
        #print(response.headers)
        #print(f'-----------redirect={redirect}')
        if 'Location' in response.headers:
            location = response.headers['Location']
            return get_page_content(location, True)
        
        response.raise_for_status()
        return response.text
    except requests.RequestException as e:
        error_msg = f"Request failed: {url}, error: {e}"
        log_error(error_msg)
        return None

def parse_article(html):
    '''
        Parse article contents.
    '''
    soup = BeautifulSoup(html, 'html.parser')
    
    # Extract title
    title = soup.find('title').get_text(strip=True) if soup.find('title') else ""
    
    # Extract date info
    date_info = soup.find_all(class_='date-line__date___kNbY')
    date, time_, updated = [d.get_text(strip=True) for d in date_info[:3]] if len(date_info) >= 3 else ("", "", "")
    
    # Extract article body
    body = "".join([p.get_text(strip=True) for p in soup.find_all(class_='article-body__content__17Yit')])
    
    # Extract tags and remove 'Suggested Topics:'
    tags_raw = [tag.get_text(strip=True) for tag in soup.find_all(attrs={'aria-label': 'Tags'})]
    tags = []
    for tag in tags_raw:
        if tag.startswith("Suggested Topics:"):
            cleaned_tag = tag.replace("Suggested Topics:", "").strip()
            if cleaned_tag:
                tags.append(cleaned_tag)
        else:
            tags.append(tag)
    
    return {
        "title": title,
        "date": date,
        "time": time_,
        "updated": updated,
        "body": body,
        "tags": tags
    }

def sanitize_filename(title):
    '''
        Sanitize the filename for it can contain spaces and other special characters.
    '''
    invalid_chars = r'[<>:"/\\|?*]'
    sanitized = re.sub(invalid_chars, '_', title)
    return sanitized[:200]

def save_article(article_data, index):
    '''
        Save article content as a json file.
    '''
    if not article_data["title"]:
        filename = f"{OUTPUT_DIR}/article_{index:04d}.json"
    else:
        sanitized_title = sanitize_filename(article_data["title"])
        filename = f"{OUTPUT_DIR}/{sanitized_title}.json"
    
    base_filename = filename
    counter = 1
    while os.path.exists(filename):
        filename = f"{base_filename[:-5]}_{counter}.json"
        counter += 1
    
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(article_data, f, ensure_ascii=False, indent=4)
    #print(f"Saved article: {article_data['title']} to {filename}")

class ThreadController:
    def __init__(self):
        self.pause_event = threading.Event()
        self.pause_event.set()
        self.processed_count = 0
        self.count_lock = Lock()
        
    def pause_all(self):
        """
            Pause all the threads.
        """
        self.pause_event.clear()
        
    def resume_all(self):
        """
            Resume all the threads.
        """
        self.pause_event.set()
        
    def check_pause(self):
        """
            Check whther to pause.
        """
        self.pause_event.wait()
        
    def increment_count(self):
        """
            Increase the number thread-safely.
        """
        with self.count_lock:
            self.processed_count += 1
            if self.processed_count % 100 == 0:
                self.pause_all()
                send_switch_ip_msg()
                self.resume_all()
            return self.processed_count
    

def process_chunk(links_chunk, thread_id, controller):
    """
        Process link chunks.
    """
    for i, link in enumerate(links_chunk, 1):
        # Check whether to pause
        controller.check_pause()
        #print(f'link={link}')

        # Update the link numbers handled.
        controller.increment_count()     
        
        try:
            save_flag = False
            html = fetch_page(link)
            
            if html and html.find('Please enable JS') < 0:
                article_data = parse_article(html)
                save_article(article_data, i)
                save_flag = True
            else:
                # retry for 5 times
                retry_times = 0
                while retry_times < 5:
                    retry_times += 1
                    time.sleep(0.5)
                    html = fetch_page(link)
                    if html and html.find('Please enable JS') < 0:
                        article_data = parse_article(html)
                        save_article(article_data, i)
                        save_flag = True
                        break
                #if not save_flag:
                    #print(f'Not saving {link}')
                
        except Exception as e:
            print(f"Thread {thread_id}: Error processing link {link}: {e}")
            continue

def process_articles():
    '''
        Process all the article links with 10 parallel threads.
    '''
    links = read_links(INPUT_FILE)
    #links = links[0:10]
    if not links:
        print("No link found, exit the process")
        return
    
    random.shuffle(links)

    batch_size = 100
    total_num = 0
    for i in range(0, len(links), batch_size):
        # Get current batch by slicing.
        batch = links[i:i + batch_size]
        
        # Split this batch to 10 sub batch.
        chunk_size = max(1, len(batch) // 10)
        link_chunks = [batch[i:i + chunk_size] for i in range(0, len(batch), chunk_size)]
        
        print(f"Start to process {len(batch)} links with 10 threads...")
        
        # New a thread controller
        controller = ThreadController()
        
        # Use ThreadPoolExecutor to handle 10 threads
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = [
                executor.submit(process_chunk, chunk, idx, controller)
                for idx, chunk in enumerate(link_chunks[:10])  # 确保不超过10个线程
            ]
            
            # Wait till all the threads end
            for future in futures:
                try:
                    future.result()
                except Exception as e:
                    print(f"Thread execution failed: {e}")
        # Output the final result
        print(f"Batch processing completed. Total links processed: {controller.processed_count}")
        total_num += controller.processed_count
    print(f"Day processing completed. Total links processed: {controller.processed_count}")
    

if __name__ == "__main__":
    # start date
    start_date = datetime(2021, 1, 3)
    # end date
    end_date = datetime(2021, 1, 31)
    # iteration date
    current_date = start_date
    
    #send_switch_ip_msg()
    
    while current_date <= end_date:
        date_str = current_date.strftime("%Y%m%d")
        print(f'Start to process {date_str}, Current time:{datetime.now()}')

        # construct article link file
        INPUT_FILE = f"articles_links/{date_str}.txt"
        # create article contents directory
        OUTPUT_DIR = f"reuters_articles/{date_str}"
    
        if not os.path.exists(OUTPUT_DIR):
            os.makedirs(OUTPUT_DIR)
        process_articles()
    
        # Iterate the date
        current_date += timedelta(days=1)


Start to process 20210103, Current time:2025-03-03 18:48:20.408000
Start to process 100 links with 10 threads...
Sent
Batch processing completed. Total links processed: 100
Start to process 100 links with 10 threads...
Sent
Batch processing completed. Total links processed: 100
Start to process 100 links with 10 threads...
Sent
Batch processing completed. Total links processed: 100
Start to process 47 links with 10 threads...
Batch processing completed. Total links processed: 40
Day processing completed. Total links processed: 40
Start to process 20210104, Current time:2025-03-03 18:51:43.978505
Start to process 100 links with 10 threads...
Sent
Batch processing completed. Total links processed: 100
Start to process 100 links with 10 threads...
Sent
Batch processing completed. Total links processed: 100
Start to process 84 links with 10 threads...
Batch processing completed. Total links processed: 80
Day processing completed. Total links processed: 80
Start to process 20210105, Current