# Grab All Patreon Post URLs

In [4]:
import re
import json
import requests
import csv
import random
from datetime import datetime
from bs4 import BeautifulSoup

url = 'https://www.patreon.com/joytactics/posts'
api_url = 'https://www.patreon.com/api/posts'
headers = {
    'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0',
    'Accept-Language': 'en-US,en;q=0.5',
    'Referer': url
}

def fetch_campaign_id(session, url):
    response = session.get(url, headers=headers)
    soup = BeautifulSoup(response.text, 'html.parser')
    campaign_id_match = re.search(r'https://www\.patreon\.com/api/campaigns/(\d+)', str(soup))
    if campaign_id_match:
        return campaign_id_match.group(1)
    return None

def fetch_posts(session, campaign_id, cursor=None):
    params = {
        'filter[campaign_id]': campaign_id,
        'filter[contains_exclusive_posts]': 'true',
        'sort': '-published_at'
    }
    if cursor:
        params['page[cursor]'] = cursor
    response = session.get(api_url, headers=headers, params=params)
    return response.json()

def clean_title(title):
    # Remove brackets and anything inside them
    title = re.sub(r'\[.*?\]', '', title)
    # Remove anything after and including the pipe symbol
    title = re.split(r'\|', title)[0]
    # Replace spaces and hyphens with underscores
    title = title.replace(' ', '_').replace('-', '_')
    # Remove any non-alphanumeric characters except underscores
    title = re.sub(r'[^a-zA-Z0-9_]', '', title)
    # Remove trailing underscores
    title = title.rstrip('_')
    return title.lower()

def parse_and_save_posts(data, rows):
    posts = data.get('data', [])
    for post in posts:
        try:
            title = post['attributes']['title']
            published_at = post['attributes']['published_at']
            patreon_url = post['attributes']['url']

            # Clean title for raw_title
            raw_title = re.sub(r'[^A-Za-z0-9 ]+', '', title).strip()

            # Format title for formatted_title
            formatted_title = clean_title(title)
            
            # Generate a random episode ID
            episode_id = random.randint(1000, 9999)

            # Convert the published_at to the desired format
            date_posted = datetime.strptime(published_at, '%Y-%m-%dT%H:%M:%S.%f%z').strftime('%Y-%m-%d %H:%M:%S')

            rows.append([episode_id, date_posted, raw_title, formatted_title, patreon_url])
        except KeyError as e:
            print(f"KeyError: {e} in post: {post}")

def save_to_csv(rows):
    # Save to CSV
    csv_columns = ['episode_id', 'date_posted', 'raw_title', 'formatted_title', 'patreon_url']
    csv_file = "data/raw_episodes.csv"

    try:
        with open(csv_file, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(csv_columns)
            writer.writerows(rows)
        print("raw_episodes.csv created successfully.")
    except IOError as e:
        print(f"I/O error: {e}")

with requests.Session() as s:
    campaign_id = fetch_campaign_id(s, url)
    if campaign_id:
        rows = []
        cursor = None
        while True:
            data = fetch_posts(s, campaign_id, cursor)
            parse_and_save_posts(data, rows)
            # Check for pagination cursor
            cursor = data['meta']['pagination']['cursors'].get('next')
            if not cursor:
                break
        save_to_csv(rows)
    else:
        print("Campaign ID not found.")

print("done")


raw_episodes.csv created successfully.
done


# Tag Patreon Episodes and Video Episodes

In [5]:
import csv
import re
from datetime import datetime
import pandas as pd

input_file = 'data/raw_episodes.csv'
output_file = 'data/episodes.csv'

# Function to check if the raw title contains 'video' and does not contain 'audio'
def check_video(title):
    title_lower = title.lower()
    return 'video' in title_lower and 'audio' not in title_lower

# Function to check if the post was made on a Monday
def check_bonus(date_str):
    date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
    return date_obj.weekday() != 0

# Function to generate transcript_title
def generate_transcript_title(row):
    date = pd.to_datetime(row['date_posted']).strftime('%Y%m%d')
    return f"{date}_{row['formatted_title']}"

# Read the existing episodes.csv to get current rows
existing_rows = []
try:
    with open(output_file, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        existing_rows = list(reader)
        existing_fieldnames = reader.fieldnames
except FileNotFoundError:
    print(f"{output_file} not found. A new file will be created.")
    existing_fieldnames = None

# Read the new rows from raw_episodes.csv
with open(input_file, newline='', encoding='utf-8') as csvfile:
    reader = csv.DictReader(csvfile)
    new_rows = list(reader)
    raw_fieldnames = reader.fieldnames

# Initialize existing_fieldnames if it was not found
if existing_fieldnames is None:
    existing_fieldnames = raw_fieldnames + ['video', 'bonus']

# Add the 'video' and 'bonus' columns and filter out rows with existing dates
for row in new_rows:
    # Check if the row is already present and if 'video' and 'bonus' fields are filled out
    existing_row = next((r for r in existing_rows if r['date_posted'] == row['date_posted']), None)
    if existing_row and 'video' in existing_row and 'bonus' in existing_row:
        continue
    
    row['video'] = check_video(row['raw_title'])
    row['bonus'] = check_bonus(row['date_posted'])
    existing_rows.append(row)

# Write the combined data to episodes.csv
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=existing_fieldnames)
    writer.writeheader()
    writer.writerows(existing_rows)

print("Updated episodes.csv successfully.")

# Now, load the episodes.csv with pandas to add the transcript_title column
df = pd.read_csv(output_file)

# Add new column
df['transcript_title'] = df.apply(generate_transcript_title, axis=1)

# Ensure the original order of the columns is maintained
df = df[existing_fieldnames + ['transcript_title']]

# Save the modified DataFrame back to CSV
df.to_csv(output_file, index=False)

print("Added transcript_title column and saved to episodes.csv.")
print("done")

Updated episodes.csv successfully.
Added transcript_title column and saved to episodes.csv.
done


# Enrich w/ YouTube URL Using Selenium

In [7]:
import csv
import re
import time
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from tqdm import tqdm

input_file = 'data/episodes.csv'
chromedriver_path = '/Users/isaacstevens/Downloads/chromedriver-mac-arm64/chromedriver'

# Set up the web driver
chrome_options = Options()
chrome_options.add_argument("--incognito")  # Open in incognito mode
chrome_options.add_argument("--window-size=1920x1080")
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)

# Function to log into Patreon
def login_to_patreon(driver):
    driver.get("https://www.patreon.com/login")

    # Define your login credentials
    patreon_email = "<YOUR-EMAIL"
    patreon_password = "<YOUR-PATREON_PASSWORD"

    # Wait for the email input field to be present
    email_input = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.NAME, "email"))
    )
    email_input.send_keys(patreon_email)

    # Find and click the continue button
    continue_button = WebDriverWait(driver, 10).until(
        EC.element_to_be_clickable((By.XPATH, "//button[contains(@class, 'hAAykn') and contains(., 'Continue')]"))
    )
    continue_button.click()

    # Wait for the password input field to be present
    password_input = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.NAME, "current-password"))
    )
    password_input.send_keys(patreon_password)
    password_input.send_keys(Keys.RETURN)

    # Wait for the login process to complete
    WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.XPATH, "//div[contains(@class, 'sc')]"))
    )
    
    # Add a brief wait to ensure the page has fully loaded
    time.sleep(5)

# Function to extract YouTube URL
def extract_youtube_url(driver, patreon_url, formatted_title):
    driver.get(patreon_url)
    try:
        iframe = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "iframe"))
        )
        driver.switch_to.frame(iframe)
        youtube_button = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, "//a[@title='Watch on YouTube']"))
        )
        youtube_url = youtube_button.get_attribute("href")
        print(f"Extracted YouTube URL for {formatted_title}: {youtube_url}")
        return youtube_url
    except Exception as e:
        print(f"Error extracting YouTube URL for {patreon_url}: {e}")
        return ""

# Read the CSV file and process the data
with open(input_file, newline='', encoding='utf-8') as csvfile:
    reader = csv.DictReader(csvfile)
    rows = list(reader)

    # Add 'youtube_url' column if not present
    if 'youtube_url' not in reader.fieldnames:
        for row in rows:
            row['youtube_url'] = ''
        fieldnames = reader.fieldnames + ['youtube_url']
    else:
        fieldnames = reader.fieldnames

# Log into Patreon
login_to_patreon(driver)

# Process the rows with video and no YouTube URL using tqdm progress bar
for row in tqdm(rows, desc="Processing episodes"):
    if row['video'] == 'True' and not row['youtube_url']:
        print(f"Executing episode: {row['formatted_title']}")
        youtube_url = extract_youtube_url(driver, row['patreon_url'], row['formatted_title'])
        if youtube_url:
            row['youtube_url'] = youtube_url

            # Save the updated data back to the same CSV file immediately
            with open(input_file, 'w', newline='', encoding='utf-8') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerows(rows)
            
            print(f"Updated CSV with YouTube URL for {row['formatted_title']}")

# Close the web driver
driver.quit()
print("done")


NoSuchDriverException: Message: Unable to obtain driver for chrome; For documentation on this error, please visit: https://www.selenium.dev/documentation/webdriver/troubleshooting/errors/driver_location
