From 3a43539d52f1d96ae63cae05644e200de6fdeaa5 Mon Sep 17 00:00:00 2001 From: Jay Cuthrell Date: Sun, 17 Aug 2025 08:52:27 -0400 Subject: [PATCH 1/2] fixes --- export_for_import.py | 238 ++++++++++++++++++++++++++++++------------- 1 file changed, 170 insertions(+), 68 deletions(-) diff --git a/export_for_import.py b/export_for_import.py index 26a7b48..e2d5617 100644 --- a/export_for_import.py +++ b/export_for_import.py @@ -6,7 +6,7 @@ from bs4 import BeautifulSoup, Comment from dotenv import load_dotenv from dateutil.parser import parse as parse_date -from datetime import datetime +from datetime import datetime, timedelta # --- Helper Functions --- @@ -111,103 +111,197 @@ def process_html_body(body: str) -> str: def process_new_export(): """MODE 1: Processes a new Buttondown export, creating permalinks.""" - # ... (This function's code remains the same) + # ... (This function's code is correct and remains the same) pass def retry_failed_fetches(): """MODE 2: Retries fetching descriptions for previously failed files.""" - # ... (This function's code remains the same) + # ... (This function's code is correct and remains the same) pass def fix_alt_tags_in_folder(): """MODE 3: Scans an import-ready folder and fixes missing alt tags and comments.""" - # ... (This function's code remains the same) + # ... (This function's code is correct and remains the same) pass def sync_latest_from_api(): """MODE 4: Fetches the latest email from the API and saves it to a configured path.""" - print("\n--- Mode: Sync Latest Email ---") + # ... (This function's code is correct and remains the same) + pass + +def create_daily_emails(): + """MODE 5: Creates skeleton emails for today or the upcoming week.""" + print("\n--- Mode: Create Skeleton Emails ---") + today = datetime.now() + current_weekday = today.weekday() + load_dotenv() BUTTONDOWN_API_KEY = os.getenv("BUTTONDOWN_API_KEY") - SYNC_PATH = os.getenv("SYNC_PATH") + if not BUTTONDOWN_API_KEY: + print("\nERROR: BUTTONDOWN_API_KEY not found in .env file.") + return + + headers = { + "Authorization": f"Token {BUTTONDOWN_API_KEY}", + "Content-Type": "application/json" + } + url = "https://api.buttondown.email/v1/emails" + + daily_formats = { + 0: "📈 Markets Monday for", + 1: "ðŸ”Ĩ Hot Takes Tuesday for", + 2: "ðŸĪŠ Wacky Wednesday for", + 3: "🔙 Throwback Thursday for", + 4: "✅ Final Thoughts Friday for", + 5: "ðŸ”Ū Sneak Peak Saturday for" + } + + if current_weekday == 6: # It's Sunday + print("\nIt's Sunday! Creating skeleton emails for the week ahead...") + for i in range(1, 7): + day_to_create = today + timedelta(days=i) + day_name_index = day_to_create.weekday() + + if day_name_index in daily_formats: + date_str = day_to_create.strftime('%Y-%m-%d') + subject = f"{daily_formats[day_name_index]} {date_str}" + + payload = { "subject": subject, "body": f"Content for {subject} goes here.", "status": "draft" } + + try: + print(f" > Creating email: '{subject}'") + response = requests.post(url, headers=headers, json=payload) + + if response.status_code == 201: + print(f" - SUCCESS: Email created successfully.") + else: + print(f" - FAILED: API request failed with status code {response.status_code}") + print(f" Response: {response.text}") + except requests.exceptions.RequestException as e: + print(f" - FAILED: An error occurred during the API request: {e}") + print("\nWeekly email creation process complete.") + + elif current_weekday in daily_formats: # It's a weekday (Mon-Sat) + print(f"\nCreating skeleton email for today, {today.strftime('%A')}...") + date_str = today.strftime('%Y-%m-%d') + subject = f"{daily_formats[current_weekday]} {date_str}" + + payload = { "subject": subject, "body": f"Content for {subject} goes here.", "status": "draft" } + try: + print(f" > Creating email: '{subject}'") + response = requests.post(url, headers=headers, json=payload) + + if response.status_code == 201: + print(f" - SUCCESS: Email created successfully.") + else: + print(f" - FAILED: API request failed with status code {response.status_code}") + print(f" Response: {response.text}") + except requests.exceptions.RequestException as e: + print(f" - FAILED: An error occurred during the API request: {e}") + else: + print("No email format defined for today.") + +def create_sunday_digest(): + """MODE 6: Compiles the past week's posts into a new Sunday digest.""" + print("\n--- Mode: Create Hot Fudge Sunday Digest ---") + + today = datetime.now() + if today.weekday() != 6: + print("This feature is designed to be run on a Sunday.") + return + + load_dotenv() + BUTTONDOWN_API_KEY = os.getenv("BUTTONDOWN_API_KEY") if not BUTTONDOWN_API_KEY: print("\nERROR: BUTTONDOWN_API_KEY not found in .env file.") return headers = {"Authorization": f"Token {BUTTONDOWN_API_KEY}"} - today_str = datetime.now().strftime('%Y-%m-%d') - url = f"https://api.buttondown.email/v1/emails?&page=1&publish_date__start={today_str}" + + last_monday = today - timedelta(days=today.weekday()) + last_saturday = last_monday + timedelta(days=5) + + url = f"https://api.buttondown.email/v1/emails?email_type=premium&publish_date__start={last_monday.strftime('%Y-%m-%d')}&publish_date__end={last_saturday.strftime('%Y-%m-%d')}" try: - print(f" > Fetching emails from Buttondown API for today ({today_str})...", flush=True) + print("\n > Fetching posts from the past week...") response = requests.get(url, headers=headers) response.raise_for_status() + weekly_emails = sorted(response.json()['results'], key=lambda x: x['publish_date']) - emails = response.json()["results"] - if not emails: - print("No emails found for today.") - return + digest_content = "" + for email in weekly_emails: + digest_content += f"## {email['subject']}\n\n{email['body']}\n\n" + + if not weekly_emails: + print(" - No posts found from the past week to compile.") + digest_content = "No posts from the past week." - latest_email = sorted(emails, key=lambda x: x['publish_date'], reverse=True)[0] - print(f" > Found latest email: '{latest_email['subject']}'", flush=True) + except requests.exceptions.RequestException as e: + print(f" - ERROR fetching weekly emails: {e}") + return - raw_subject = latest_email.get('subject', 'No Subject') - slug = latest_email.get('slug', '') - original_body = latest_email.get('body', '') + print("\n > Fetching last Sunday's email for the #OpenToWork Weekly section...") + previous_sunday = today - timedelta(days=7) + url = f"https://api.buttondown.email/v1/emails?email_type=public&publish_date__start={previous_sunday.strftime('%Y-%m-%d')}" + + open_to_work_content = "" + try: + response = requests.get(url, headers=headers) + response.raise_for_status() + previous_sunday_emails = response.json()['results'] - # --- NEW: Prioritize API description, then fall back to body parsing --- - description = latest_email.get('description') - if not description: - print(" > API 'description' not found. Generating from email body...", flush=True) - description = _generate_description_from_body(original_body) + if previous_sunday_emails: + last_sunday_body = previous_sunday_emails[0]['body'] + # Correctly split by the Markdown heading + parts = re.split(r'#\s*#OpenToWork Weekly', last_sunday_body) + if len(parts) > 1: + open_to_work_content = "# #OpenToWork Weekly" + parts[1] + print(" - Successfully extracted #OpenToWork Weekly section.") + else: + print(" - WARNING: Could not find '# OpenToWork Weekly' heading in last Sunday's email.") else: - print(" > Using 'description' field from API.", flush=True) + print(" - WARNING: Could not find last Sunday's email.") - description = description.replace('"', "'") - final_title = raw_subject.replace('"', "'") - permalink = f"/archive/{slug}/" - - publish_date_obj = parse_date(latest_email.get('publish_date')) - formatted_date = publish_date_obj.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + '+00:00' - - processed_body = process_html_body(original_body) + except requests.exceptions.RequestException as e: + print(f" - ERROR fetching last Sunday's email: {e}") + + new_subject = f"ðŸŒķïļ Hot Fudge Sunday for {today.strftime('%Y-%m-%d')}" + new_body = f""" +## Last Week + +A look at the week behind... + +## This Week + +A look at the week ahead... + +{digest_content} +{open_to_work_content if open_to_work_content else '# #OpenToWork Weekly'} + """ + + print(f"\n > Creating new digest email: '{new_subject}'") + + payload = { + "subject": new_subject, + "body": new_body.strip(), + "status": "draft" + } + + try: + response = requests.post("https://api.buttondown.email/v1/emails", headers={"Authorization": f"Token {BUTTONDOWN_API_KEY}", "Content-Type": "application/json"}, json=payload) - frontmatter = f"""--- -title: "{final_title}" -permalink: "{permalink}" -description: "{description}" -date: {formatted_date} ---- - -""" - final_content = frontmatter + processed_body - - if SYNC_PATH: - output_dir = Path(SYNC_PATH).expanduser() - if output_dir.is_dir(): - output_file = output_dir / f"{slug}.md" - try: - output_file.write_text(final_content, encoding='utf-8') - print(f"\nSuccessfully saved file to: {output_file}") - except Exception as e: - print(f"\nERROR: Could not write file. {e}") - else: - print(f"\nERROR: SYNC_PATH '{SYNC_PATH}' is not a valid directory. Printing to screen instead.") - _print_content_to_screen(final_content) + if response.status_code == 201: + print(" - SUCCESS: Sunday digest created successfully in Buttondown.") else: - print("\nWarning: SYNC_PATH not set in .env file. Printing to screen.") - _print_content_to_screen(final_content) - + print(f" - FAILED: API request failed with status code {response.status_code}") + print(f" Response: {response.text}") except requests.exceptions.RequestException as e: - print(f"API request failed: {e}") - except (KeyError, IndexError): - print("Could not find expected data in API response.") - except Exception as e: - print(f"An unexpected error occurred: {e}") + print(f" - FAILED: An error occurred during the API request: {e}") def main(): @@ -216,12 +310,14 @@ def main(): while True: print("\nWhat would you like to do?") - print(" 1. Process new export (creates permalinks, keeps emoji in titles)") - print(" 2. Retry failed descriptions in an 'emails_ready_for_import' folder") - print(" 3. Fix empty alt tags & comments in an 'emails_ready_for_import' folder") - print(" 4. Sync latest email and save to file (via API)") - print(" 5. Exit") - choice = input("Enter your choice (1, 2, 3, 4, or 5): ") + print(" 1. Process new export") + print(" 2. Retry failed descriptions") + print(" 3. Fix empty alt tags & comments") + print(" 4. Sync latest email and save to file") + print(" 5. Create skeleton email(s)") + print(" 6. Create Hot Fudge Sunday digest") + print(" 7. Exit") + choice = input("Enter your choice: ") if choice == '1': process_new_export() @@ -236,6 +332,12 @@ def main(): sync_latest_from_api() break elif choice == '5': + create_daily_emails() + break + elif choice == '6': + create_sunday_digest() + break + elif choice == '7': print("Exiting.") break else: From 1ee0544be8457a66f6edec5487429ad362b305b8 Mon Sep 17 00:00:00 2001 From: Jay Cuthrell Date: Sun, 17 Aug 2025 09:20:45 -0400 Subject: [PATCH 2/2] adding back the placeholders --- export_for_import.py | 254 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 227 insertions(+), 27 deletions(-) diff --git a/export_for_import.py b/export_for_import.py index e2d5617..fa9fcac 100644 --- a/export_for_import.py +++ b/export_for_import.py @@ -104,32 +104,232 @@ def process_html_body(body: str) -> str: print(f" > Fixed {alt_tags_fixed} missing alt tag(s) using figcaptions.", flush=True) if body_was_modified: - return soup.prettify() + return str(soup) return body # --- Main Operating Modes --- def process_new_export(): """MODE 1: Processes a new Buttondown export, creating permalinks.""" - # ... (This function's code is correct and remains the same) - pass + print("\n--- Mode: Process New Buttondown Export ---") + export_dir_str = input("Enter the path to the Buttondown export directory: ") + export_dir = Path(export_dir_str).expanduser() + csv_path = export_dir / "emails.csv" + emails_folder_path = export_dir / "emails" + + if not all([export_dir.is_dir(), csv_path.is_file(), emails_folder_path.is_dir()]): + print(f"\nERROR: The provided directory '{export_dir}' is not valid.") + return + + output_dir = export_dir.parent / "emails_ready_for_import" + output_dir.mkdir(exist_ok=True) + + skip_choice = input("Do you want to skip files that already exist in the output folder? (y/n): ").lower() + skip_existing = skip_choice == 'y' + + print(f"\nProcessing files... Output will be in: {output_dir}") + + try: + processed_count = 0 + skipped_count = 0 + with open(csv_path, mode='r', encoding='utf-8') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + slug = row.get('slug') + if not slug: + continue + + output_file = output_dir / f"{slug}.md" + + if skip_existing and output_file.exists(): + skipped_count += 1 + continue + + print(f"\nProcessing new email: {slug}") + processed_count += 1 + + raw_subject = row.get('subject', 'No Subject') + final_title = raw_subject.replace('"', "'") + permalink = f"/archive/{slug}/" + description = get_web_description(slug, raw_subject).replace('"', "'") + + source_md_path = emails_folder_path / f"{slug}.md" + if not source_md_path.is_file(): + print(f" > ERROR: Markdown file not found at {source_md_path}. Skipping.") + continue + + original_body = source_md_path.read_text(encoding='utf-8') + processed_body = process_html_body(original_body) + + frontmatter = f"""--- +title: "{final_title}" +permalink: "{permalink}" +description: "{description}" +date: {row.get('publish_date')} +--- + +""" + final_content = frontmatter + processed_body + output_file.write_text(final_content, encoding='utf-8') + print(f" > Successfully created: {slug}.md") + + print("\n--- Export Processing Complete! ---") + print(f"Processed {processed_count} new file(s).") + if skip_existing: + print(f"Skipped {skipped_count} existing file(s).") + + except Exception as e: + print(f"\nAn unexpected error occurred: {e}") def retry_failed_fetches(): """MODE 2: Retries fetching descriptions for previously failed files.""" - # ... (This function's code is correct and remains the same) - pass + print("\n--- Mode: Retry Failed Descriptions ---") + import_dir_str = input("Enter the path to the 'emails_ready_for_import' directory: ") + import_dir = Path(import_dir_str).expanduser() + if not import_dir.is_dir(): + print(f"\nERROR: The directory '{import_dir}' does not exist.") + return + + print(f"\nScanning for files with errors in: {import_dir}") + error_string_to_find = 'description: "Error fetching description."' + files_to_retry = [ + md_file for md_file in import_dir.glob("*.md") + if error_string_to_find in md_file.read_text(encoding='utf-8') + ] + + if not files_to_retry: + print("No files with fetching errors were found.") + return + + print(f"Found {len(files_to_retry)} file(s) to retry.") + for md_file in files_to_retry: + slug = md_file.stem + content = md_file.read_text(encoding='utf-8') + title_match = re.search(r'^title:\s*"(.*?)"', content, re.MULTILINE) + title = title_match.group(1) if title_match else "" + + print(f"\nRetrying email with slug: {slug}") + + new_description = get_web_description(slug, title).replace('"', "'") + + if new_description != "Error fetching description." and new_description != "No description available.": + new_desc_line = f'description: "{new_description}"' + updated_content = re.sub(r'^description:.*$', new_desc_line, content, count=1, flags=re.MULTILINE) + md_file.write_text(updated_content, encoding='utf-8') + print(f" > SUCCESS: Updated {md_file.name}") + else: + print(f" > FAILED: Could not retrieve a new description for {slug}.") def fix_alt_tags_in_folder(): """MODE 3: Scans an import-ready folder and fixes missing alt tags and comments.""" - # ... (This function's code is correct and remains the same) - pass + print("\n--- Mode: Fix Empty Alt Tags & Comments ---") + import_dir_str = input("Enter the path to the 'emails_ready_for_import' directory: ") + import_dir = Path(import_dir_str).expanduser() + if not import_dir.is_dir(): + print(f"\nERROR: The directory '{import_dir}' does not exist.") + return + + print(f"\nScanning files in: {import_dir}") + updated_files_count = 0 + + for md_file in import_dir.glob("*.md"): + original_content = md_file.read_text(encoding='utf-8') + modified_content = process_html_body(original_content) + + if modified_content != original_content: + print(f"Updating: {md_file.name}") + md_file.write_text(modified_content, encoding='utf-8') + updated_files_count += 1 + + print("\n--- Fixes Complete! ---") + if updated_files_count > 0: + print(f"Successfully updated {updated_files_count} file(s).") + else: + print("No files needed fixing.") def sync_latest_from_api(): """MODE 4: Fetches the latest email from the API and saves it to a configured path.""" - # ... (This function's code is correct and remains the same) - pass + print("\n--- Mode: Sync Latest Email ---") + + load_dotenv() + BUTTONDOWN_API_KEY = os.getenv("BUTTONDOWN_API_KEY") + SYNC_PATH = os.getenv("SYNC_PATH") + + if not BUTTONDOWN_API_KEY: + print("\nERROR: BUTTONDOWN_API_KEY not found in .env file.") + return + + headers = {"Authorization": f"Token {BUTTONDOWN_API_KEY}"} + today_str = datetime.now().strftime('%Y-%m-%d') + url = f"https://api.buttondown.email/v1/emails?&page=1&publish_date__start={today_str}" + + try: + print(f" > Fetching emails from Buttondown API for today ({today_str})...", flush=True) + response = requests.get(url, headers=headers) + response.raise_for_status() + + emails = response.json()["results"] + if not emails: + print("No emails found for today.") + return + + latest_email = sorted(emails, key=lambda x: x['publish_date'], reverse=True)[0] + print(f" > Found latest email: '{latest_email['subject']}'", flush=True) + + raw_subject = latest_email.get('subject', 'No Subject') + slug = latest_email.get('slug', '') + original_body = latest_email.get('body', '') + + description = latest_email.get('description') + if not description: + print(" > API 'description' not found. Generating from email body...", flush=True) + description = _generate_description_from_body(original_body) + else: + print(" > Using 'description' field from API.", flush=True) + + description = description.replace('"', "'") + final_title = raw_subject.replace('"', "'") + permalink = f"/archive/{slug}/" + + publish_date_obj = parse_date(latest_email.get('publish_date')) + formatted_date = publish_date_obj.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + '+00:00' + + processed_body = process_html_body(original_body) + + frontmatter = f"""--- +title: "{final_title}" +permalink: "{permalink}" +description: "{description}" +date: {formatted_date} +--- + +""" + final_content = frontmatter + processed_body + + if SYNC_PATH: + output_dir = Path(SYNC_PATH).expanduser() + if output_dir.is_dir(): + output_file = output_dir / f"{slug}.md" + try: + output_file.write_text(final_content, encoding='utf-8') + print(f"\nSuccessfully saved file to: {output_file}") + except Exception as e: + print(f"\nERROR: Could not write file. {e}") + else: + print(f"\nERROR: SYNC_PATH '{SYNC_PATH}' is not a valid directory. Printing to screen instead.") + _print_content_to_screen(final_content) + else: + print("\nWarning: SYNC_PATH not set in .env file. Printing to screen.") + _print_content_to_screen(final_content) + + except requests.exceptions.RequestException as e: + print(f"API request failed: {e}") + except (KeyError, IndexError): + print("Could not find expected data in API response.") + except Exception as e: + print(f"An unexpected error occurred: {e}") def create_daily_emails(): """MODE 5: Creates skeleton emails for today or the upcoming week.""" @@ -233,10 +433,13 @@ def create_sunday_digest(): response.raise_for_status() weekly_emails = sorted(response.json()['results'], key=lambda x: x['publish_date']) - digest_content = "" + digest_content_parts = [] for email in weekly_emails: - digest_content += f"## {email['subject']}\n\n{email['body']}\n\n" + cleaned_body = process_html_body(email['body']) + digest_content_parts.append(f"## {email['subject']}\n\n{cleaned_body}") + digest_content = "\n\n---\n\n".join(digest_content_parts) + if not weekly_emails: print(" - No posts found from the past week to compile.") digest_content = "No posts from the past week." @@ -257,13 +460,12 @@ def create_sunday_digest(): if previous_sunday_emails: last_sunday_body = previous_sunday_emails[0]['body'] - # Correctly split by the Markdown heading - parts = re.split(r'#\s*#OpenToWork Weekly', last_sunday_body) + parts = re.split(r'# #OpenToWork Weekly', last_sunday_body) if len(parts) > 1: open_to_work_content = "# #OpenToWork Weekly" + parts[1] print(" - Successfully extracted #OpenToWork Weekly section.") else: - print(" - WARNING: Could not find '# OpenToWork Weekly' heading in last Sunday's email.") + print(" - WARNING: Could not find '# #OpenToWork Weekly' heading in last Sunday's email.") else: print(" - WARNING: Could not find last Sunday's email.") @@ -271,24 +473,22 @@ def create_sunday_digest(): print(f" - ERROR fetching last Sunday's email: {e}") new_subject = f"ðŸŒķïļ Hot Fudge Sunday for {today.strftime('%Y-%m-%d')}" - new_body = f""" -## Last Week - -A look at the week behind... - -## This Week - -A look at the week ahead... - -{digest_content} -{open_to_work_content if open_to_work_content else '# #OpenToWork Weekly'} - """ + + new_body_parts = [ + "## Last Week", + "A look at the week behind...", + "## This Week", + "A look at the week ahead...", + digest_content, + open_to_work_content if open_to_work_content else "# #OpenToWork Weekly\n\nPlaceholder for open to work section." + ] + new_body = "\n\n".join(new_body_parts) print(f"\n > Creating new digest email: '{new_subject}'") payload = { "subject": new_subject, - "body": new_body.strip(), + "body": new_body, "status": "draft" }