In [None]:
import base64
import os
import time
from typing import Dict, List, Literal, Optional

import requests
from dotenv import load_dotenv

In [None]:
Post_Status = Literal["publish", "private", "draft", "pending", "future"]


class WordPressManager:
    def __init__(self, site_url: str, username: str, password: str):
        self.site_url = site_url.rstrip("/")
        self.api_base = f"{self.site_url}/wp-json/wp/v2"

        # Create authentication header
        credentials = f"{username}:{password}"
        token = base64.b64encode(credentials.encode()).decode()
        self.headers = {
            "Authorization": f"Basic {token}",
            "Content-Type": "application/json",
        }

    def test_connection(self) -> bool:
        """
        Use this method to test the connection to the WordPress site.
        Returns `True` if the instance can connect to the WordPress site,
        `False` otherwise.
        """
        try:
            response = requests.get(f"{self.api_base}/users/me", headers=self.headers)
            if response.status_code == 200:
                print("✅ Successfully connected to WordPress!")
                return True
            else:
                print(f"❌ Connection failed: {response.status_code} - {response.text}")
                return False
        except Exception as e:
            print(f"❌ Connection error: {str(e)}")
            return False

    def get_category_id(self, category_name: str) -> Optional[int]:
        """Get category ID by name."""
        url = f"{self.api_base}/categories"
        params = {"search": category_name, "per_page": 100}

        try:
            response = requests.get(url, params=params, headers=self.headers)
            response.raise_for_status()

            categories = response.json()
            for category in categories:
                if category["name"].lower() == category_name.lower():
                    return category["id"]

            print(f"Category '{category_name}' not found.")
            return None

        except requests.exceptions.RequestException as e:
            print(f"Error fetching categories: {e}")
            return None

    def get_posts_by_category(
        self, category_id: int, status: str = "any"
    ) -> List[Dict]:
        """
        Get all posts in a specific category.

        Args:
            category_id: ID of the category
            status: Post status filter ('publish', 'private', 'any')
        """
        posts = []
        page = 1
        per_page = 100

        while True:
            url = f"{self.api_base}/posts"
            params = {
                "categories": category_id,
                "status": status,
                "per_page": per_page,
                "page": page,
            }

            try:
                response = requests.get(url, params=params, headers=self.headers)
                response.raise_for_status()

                batch_posts = response.json()
                if not batch_posts:
                    break

                posts.extend(batch_posts)
                page += 1

                # Check if we've got all posts (less than per_page means last page)
                if len(batch_posts) < per_page:
                    break

            except requests.exceptions.RequestException as e:
                print(f"Error fetching posts: {e}")
                break

        return posts

    def update_post_status(self, post_id: int, new_status: Post_Status) -> bool:
        """
        Update a post's status.

        Args:
            post_id (int): The ID of the post to update.
            new_status (Post_Status): The new status to set for the post — typically 'publish', 'private',
                or 'draft', etc.

        Returns:
            bool: True if the update was successful, False otherwise.
        """

        url = f"{self.api_base}/posts/{post_id}"
        data = {"status": new_status}

        try:
            response = requests.post(url, json=data, headers=self.headers)
            response.raise_for_status()
            return True

        except requests.exceptions.RequestException as e:
            print(f"Error updating post {post_id}: {e}")
            return False

    def change_category_posts_from_publish_to_private(self, category_name: str) -> int:
        """
        Change all public posts in a given category to private.

        Args:
            category_name: The name of the category whose status will be changed
                from publish to private.

        Returns:
            int: The number of posts whose status was changed from publish to private.
        """

        print(f"Getting '{category_name}' category ID...")
        category_id = self.get_category_id(category_name)

        if not category_id:
            return 0

        print(f"Found '{category_name}' category with ID: {category_id}")
        print(f"Fetching published posts in '{category_name}' category...")

        posts = self.get_posts_by_category(category_id, "publish")

        if not posts:
            print(f"No published posts found in '{category_name}' category.")
            return 0

        print(f"Found {len(posts)} published posts in '{category_name}' category.")
        changed_count = 0

        for post in posts:
            post_id = post["id"]
            post_title = post["title"]["rendered"]

            print(f"Making private: '{post_title}' (ID: {post_id})")

            if self.update_post_status(post_id, "private"):
                changed_count += 1
                print(f"✓ Successfully made private")
            else:
                print(f"✗ Failed to update")

            # Small delay to be respectful to the server
            time.sleep(0.5)

        print(f"\nCompleted: {changed_count} posts made private.")
        return changed_count

    def change_category_posts_from_private_to_publish(self, category_name: str) -> int:
        """
        Change all private posts in a given category to public.

        Args:
            category_name: The name of the category whose status will be changed
                from private to publish.

        Returns:
            int: The number of posts whose status was changed from private to publish.
        """

        print(f"Getting '{category_name}' category ID...")
        category_id = self.get_category_id(category_name)

        if not category_id:
            return 0

        print(f"Found '{category_name}' category with ID: {category_id}")
        print(f"Fetching private posts in '{category_name}' category...")

        posts = self.get_posts_by_category(category_id, "private")

        if not posts:
            print(f"No private posts found in '{category_name}' category.")
            return 0

        print(f"Found {len(posts)} private posts in '{category_name}' category.")
        changed_count = 0

        for post in posts:
            post_id = post["id"]
            post_title = post["title"]["rendered"]

            print(f"Making public: '{post_title}' (ID: {post_id})")

            if self.update_post_status(post_id, "publish"):
                changed_count += 1
                print(f"✓ Successfully made public")
            else:
                print(f"✗ Failed to update")

            # Small delay to be respectful to the server
            time.sleep(0.5)

        print(f"\nCompleted: {changed_count} posts made public.")
        return changed_count

    def get_category_posts_status_count(self, category_name: str) -> Dict[Post_Status, int]:
        """
        Get the count of posts grouped by status for a given category.
    
        Args:
            category_name (str): The name of the category to analyze.
            
        Returns:
            Dict[Post_Status, int]: A dictionary mapping each post status to its count.
                Returns an empty dictionary if the category doesn't exist.
                                
        Example:
            >>> blog.get_category_posts_status_count("Technology")
            {'published': 15, 'draft': 3, 'archived': 2}
        """
        
        category_id = self.get_category_id(category_name)

        if not category_id:
            return {}

        all_posts = self.get_posts_by_category(category_id, "any")
        status_count = {}

        for post in all_posts:
            status = post["status"]
            status_count[status] = status_count.get(status, 0) + 1

        return status_count

In [None]:
SITE_URL = "https://joeydevilla.com"
load_dotenv()
USERNAME = os.getenv("ACCORDION_GUY_WORDPRESS_USERNAME")
PASSWORD = os.getenv("ACCORDION_GUY_WORDPRESS_APP_PASSWORD")  # Use application password, not regular password

blog = WordPressManager(SITE_URL, USERNAME, PASSWORD)
blog.test_connection()