In [57]:
import json
import os
import requests
import sys
import time
import warnings

from typing import List, Dict, Optional, Tuple, Any, Set
from datetime import datetime, timedelta
from functools import wraps
from bs4 import BeautifulSoup
from urllib.parse import urlparse


# Add the project root to the Python path
project_root = os.path.abspath(os.path.join(os.getcwd(), '..', '..'))
sys.path.append(project_root)

from src.utils.helper import load_env

In [None]:
warnings.filterwarnings('ignore')
load_env()
CLIENT_SECRET = os.getenv("WPCOM_CLIENT_SECRET")
ACCESS_TOKEN = os.getenv("WPCOM_ACCESS_TOKEN")

In [20]:
# Define WordPress.com API constants
CLIENT_ID = '<CLIENT_ID>'
REDIRECT_URI = '<REDIRECT_URI>'
AUTHORIZATION_BASE_URL = 'https://public-api.wordpress.com/oauth2/authorize'
TOKEN_URL = 'https://public-api.wordpress.com/oauth2/token'

In [58]:
def save_json_file(filename: str, data: List[Dict]):
    """
    Save data to a JSON file, appending if the file exists.
    """
    existing_data = []
    if os.path.exists(filename):
        with open(filename, 'r', encoding='utf-8') as f:
            existing_data = json.load(f)

    # Combine existing data with new data, avoiding duplicates
    existing_urls = set(post['URL'] for post in existing_data)
    new_data = [post for post in data if post['URL'] not in existing_urls]
    existing_data.extend(new_data)

    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(existing_data, f, ensure_ascii=False, indent=2)

def save_posts_to_disk(posts: List[Dict], site: str, usernames: List[str], output_dir: str = "data"):
    """
    Save the downloaded posts to disk as JSON files, organized by domain and author.

    Args:
        posts (List[Dict]): A list of dictionaries containing post data.
        site (str): The site domain.
        usernames (List[str]): List of usernames to filter posts by.
        output_dir (str): The name of the output directory.
    """
    current_dir = os.getcwd()
    project_root = os.path.abspath(os.path.join(current_dir, '..', '..'))
    domain_output_dir = os.path.join(project_root, output_dir, 'domain_posts')
    author_output_dir = os.path.join(project_root, output_dir, 'author_posts')

    os.makedirs(domain_output_dir, exist_ok=True)
    os.makedirs(author_output_dir, exist_ok=True)

    # Save all posts for the domain
    domain_filename = os.path.join(domain_output_dir, f"{site}.json")
    save_json_file(domain_filename, posts)
    print(f"All posts for site {site} saved to {domain_filename}")

    # Filter and save posts by author
    for username in usernames:
        author_posts = [post for post in posts if post['author_login'].lower() == username.lower()]
        if author_posts:
            author_filename = os.path.join(author_output_dir, f"{username}.json")
            save_json_file(author_filename, author_posts)
            print(f"Posts for author {username} saved to {author_filename}")

    handle_cross_posts(posts, site, output_dir)

def load_posts_from_disk(site: str, username: str = None, output_dir: str = "data") -> List[Dict]:
    """
    Load posts from JSON files, either by domain or by author.

    Args:
        site (str): The site domain.
        username (str, optional): The username to load posts for. If None, load all posts for the site.
        output_dir (str): The name of the output directory.

    Returns:
        List[Dict]: A list of dictionaries containing post data.
    """
    current_dir = os.getcwd()
    project_root = os.path.abspath(os.path.join(current_dir, '..', '..'))

    if username:
        filename = os.path.join(project_root, output_dir, 'author_posts', f"{username}.json")
    else:
        filename = os.path.join(project_root, output_dir, 'domain_posts', f"{site}.json")

    if os.path.exists(filename):
        with open(filename, 'r', encoding='utf-8') as f:
            posts = json.load(f)
        print(f"Loaded {len(posts)} posts from {filename}")
        return posts
    else:
        print(f"No saved posts found for {'author ' + username if username else 'site ' + site}")
        return []

def handle_cross_posts(posts: List[Dict], file_domain: str, output_dir: str = "data"):
    """
    Write cross-posts to their own domain files.

    Args:
        posts (List[Dict]): List of posts to process.
        file_domain (str): The domain of the current file being processed.
        output_dir (str): The name of the output directory.
    """
    current_dir = os.getcwd()
    project_root = os.path.abspath(os.path.join(current_dir, '..', '..'))
    domain_output_dir = os.path.join(project_root, output_dir, 'domain_posts')

    cross_posts = {}

    for post in posts:
        post_domain = urlparse(post['URL']).netloc
        if post_domain != file_domain:
            if post_domain not in cross_posts:
                cross_posts[post_domain] = []
            cross_posts[post_domain].append(post)

    for domain, domain_posts in cross_posts.items():
        cross_post_filename = os.path.join(domain_output_dir, f"{domain}.json")
        save_json_file(cross_post_filename, domain_posts)
        print(f"Wrote {len(domain_posts)} cross-posts to {cross_post_filename}")

In [59]:
def rate_limit(max_requests: int = 100, period: int = 10):
    """
    Decorator to limit the rate of API calls.

    Args:
        max_requests (int): Maximum number of requests allowed in the given period.
        period (int): Time period in seconds.
    """
    calls: List[float] = []

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            now = time.time()
            calls.append(now)
            if len(calls) > max_requests:
                oldest = calls.pop(0)
                if now - oldest < period:
                    time.sleep(period - (now - oldest))
            return func(*args, **kwargs)
        return wrapper
    return decorator

In [60]:
@rate_limit()
def get_post_data(site: str, post_id: int, access_token: str) -> Optional[Dict]:
    """
    Fetch metadata for a specific post.
    """
    url = f"https://public-api.wordpress.com/rest/v1.1/sites/{site}/posts/{post_id}"
    headers = {"Authorization": f"Bearer {access_token}"}
    params = {
        "fields": "ID,site_ID,author,date,title,URL,content,excerpt,status,like_count,comment_count,tags,categories"
    }
    response = requests.get(url, headers=headers, params=params)

    if response.status_code == 200:
        post = response.json()
        processed_post = {
            'post_ID': post['ID'],
            'site_ID': post['site_ID'],
            'site_domain': site,
            'author_user_ID': post['author']['ID'],
            'author_login': post['author']['login'],
            'date': post['date'],
            'title': post['title'],
            'URL': post['URL'],
            'content': post['content'],
            'excerpt': post['excerpt'],
            'status': post['status'],
            'like_count': post['like_count'],
            'tags': post.get('tags', {}),
            'categories': post.get('categories', {})
        }

        # Only store the tags and categories as lists of names
        processed_post['tags'] = extract_names(processed_post['tags'])
        processed_post['categories'] = extract_names(processed_post['categories'])

        # Fetch comments count
        comments_url = f"https://public-api.wordpress.com/rest/v1.1/sites/{site}/posts/{post['ID']}/replies/"
        comments_response = requests.get(comments_url, headers=headers)
        if comments_response.status_code == 200:
            comments_data = comments_response.json()
            processed_post['comment_count'] = comments_data.get('found', 0)
        else:
            processed_post['comment_count'] = 0

        # Fetch views count
        views_url = f"https://public-api.wordpress.com/rest/v1.1/sites/{site}/stats/post/{post['ID']}"
        views_response = requests.get(views_url, headers=headers)
        if views_response.status_code == 200:
            views_data = views_response.json()
            processed_post['views'] = views_data.get('views', 0)
        else:
            processed_post['views'] = 0

        return processed_post
    else:
        print(f"Error: Unable to fetch post data for post ID {post_id} on site '{site}'. HTTP Status Code: {response.status_code}")
        return None

def extract_names(metadata_dict: Dict) -> List[str]:
    """
    Extracts the names of tags or categories from the metadata dictionary.
    """
    return list(metadata_dict.keys())

def extract_original_post_url(content: str) -> Optional[str]:
    """
    Extract the original post URL from X-post content.

    Args:
        content (str): The content of the X-post.

    Returns:
        Optional[str]: The URL of the original post if found, None otherwise.
    """
    soup = BeautifulSoup(content, 'html.parser')
    links = soup.find_all('a', href=True)
    if links:
        return links[-1]['href']
    return None


def get_site_and_slug_from_url(url: str) -> Tuple[str, str]:
    """
    Extract site domain and post slug from a WordPress post URL.

    Args:
        url (str): The URL of the WordPress post.

    Returns:
        Tuple[str, str]: A tuple containing the site domain and post slug.
    """
    parsed_url = urlparse(url)
    site = parsed_url.netloc
    path_parts = parsed_url.path.strip('/').split('/')

    # Assuming the URL structure is like: https://<site>/YYYY/MM/DD/<post-slug>/
    if len(path_parts) >= 4 and all(part.isdigit() for part in path_parts[:3]):
        slug = path_parts[-1]
    else:
        # If the URL doesn't match the expected format, use the last part as slug
        slug = path_parts[-1]

    return site, slug


@rate_limit()
def get_post_by_url(url: str, access_token: str) -> Optional[Dict]:
    """
    Fetch post data by URL.

    Args:
        url (str): The URL of the WordPress post.
        access_token (str): The access token for API authentication.

    Returns:
        Optional[Dict]: A dictionary containing post data if successful, None otherwise.
    """
    site, slug = get_site_and_slug_from_url(url)
    api_url = f"https://public-api.wordpress.com/rest/v1.1/sites/{site}/posts/slug:{slug}"
    headers = {"Authorization": f"Bearer {access_token}"}
    response = requests.get(api_url, headers=headers)

    if response.status_code == 200:
        post_data = response.json()
        return post_data
    else:
        print(f"Error: Unable to fetch post data for URL {url}. HTTP Status Code: {response.status_code}")
        return None

@rate_limit()
def fetch_new_posts(site: str, access_token: str, processed_urls: Set[str], start_date: datetime) -> List[Dict]:
    """
    Fetch new posts that haven't been processed before.

    Args:
        site (str): The WordPress site domain.
        access_token (str): The access token for API authentication.
        processed_urls (Set[str]): Set of already processed post URLs.

    Returns:
        List[Dict]: A list of dictionaries containing new post data.
    """
    url = f"https://public-api.wordpress.com/rest/v1.1/sites/{site}/posts/"
    headers = {"Authorization": f"Bearer {access_token}"}
    params = {
        'number': 100,
        'fields': "ID,site_ID,author,date,title,URL,content,excerpt,status,like_count,tags,categories",
        "after": start_date.isoformat()
    }
    all_posts = []
    page = 1

    while True:
        params['page'] = page
        print(f"Fetching page {page} of posts from site '{site}'")
        response = requests.get(url, headers=headers, params=params)

        if response.status_code == 200:
            data = response.json()
            posts = data.get('posts', [])
            for post in posts:
                if post['URL'] in processed_urls:
                    continue
                print(f"Processing post: {post['URL']}")

                if post['title'].startswith("X-post:"):
                    original_url = extract_original_post_url(post['content'])
                    if original_url and original_url not in processed_urls:
                        original_post = get_post_by_url(original_url, access_token)
                        if original_post:
                            processed_post = get_post_data(original_post['site_ID'], original_post['ID'], access_token)
                            if processed_post:
                                all_posts.append(processed_post)
                                processed_urls.add(original_url)
                else:
                    processed_post = get_post_data(site, post['ID'], access_token)
                    if processed_post:
                        all_posts.append(processed_post)
                        processed_urls.add(post['URL'])

            if len(posts) < 100:  # Less than 100 posts returned, we've reached the end
                break
            page += 1
        else:
            print(f"Error: Unable to fetch posts for site '{site}'. HTTP Status Code: {response.status_code}")
            break

    return all_posts

In [61]:
def get_all_posts_for_site(site: str, access_token: str, processed_urls: Set[str], usernames: List[str]) -> List[Dict]:
    """
    Fetch all posts from a specific site, including previously processed posts.

    Args:
        site (str): The WordPress site domain.
        access_token (str): The access token for API authentication.
        processed_urls (Set[str]): Set of already processed post URLs.
        usernames (List[str]): List of usernames to filter posts by.

    Returns:
        List[Dict]: A list of dictionaries containing post data.
    """
    # Load previously processed posts
    all_posts = load_posts_from_disk(site)

    # Add URLs of loaded posts to processed_urls
    processed_urls.update(post['URL'] for post in all_posts)

    # Retrieve posts from the last 4 years. No need to go back further.
    three_years_ago = datetime.now() - timedelta(days=3*365)

    new_posts = fetch_new_posts(site, access_token, processed_urls, three_years_ago)
    all_posts.extend(new_posts)

    # Save all posts to disk
    save_posts_to_disk(all_posts, site, usernames)

    handle_cross_posts(all_posts, site)

    return all_posts

In [None]:
sites = ["site1.com", "site2.com", "site3.com", "site4.com", "site5.com"]
usernames = ["user1", "user2", "user3", "user4", "user5"]
processed_urls = set()

for site in sites:
    posts = get_all_posts_for_site(site, ACCESS_TOKEN, processed_urls, usernames)
    print(f"Total posts for site {site}: {len(posts)}")