# Welcome to Finite News!
This notebook creates and emails issues of Finite News. To facilitate simple scheduling of the notebook with Sagemaker, all the code lives inside the notebook. Papermill jobs cannot import local Python scripts. 

# Parameters for the notebook
The constants below are used for developing and debugging. All other parameters for the newspaper are configured in the files on S3.  
  
<div class="alert alert-block alert-info">
    <p><b>To deploy from dev to prod:</b></p>
    <ol>
        <li>Set <code>DEV_MODE</code> & <code>DISABLE_GPT</code> to <code>False</code>.</li>
        <li>Create a new <a href="https://docs.aws.amazon.com/sagemaker/latest/dg/create-notebook-auto-run-studio.html" target="_blank">scheduled notebook job</a> on the Data Science 2.0 image with Python 3.8.</li>
        <li>Delete the old notebook job if any.</li>
        <li>Shut down the Sagemaker instance if it was used during development.</li>
    </ol>
</div>

In [None]:
DEV_MODE = False # True will not send email and not cache newly fetched headlines for dedup later
DISABLE_GPT = False # True will not call GPT API, so we don't incur costs while debugging
LOGGING_LEVEL = "warning" # What level of logging should go in admin's issue/local log file?
                       # Use "warning" by default. 
                       # Use "info" to get more detailed FN messages for debugging
                       # Use "debug" to get lower-level messages from dependencies
                       # Reminder: When re-running notebook, to change logging level for example, restart kernel. Logger can only initialize once

# Imports

The next cell installs specific versions of packages that I've found play nice together on SageMaker, both local runs and scheduled jobs using `Python 3.8` on an `ml.m5.large` instance and the `Data Science 2.0` image.  
  
ℹ️ If you're running FiniteNews in a different environment and you get errors here:
1. Try finding different versions that play nicely in your environment. Start by `pip` installing offending packages without a pinned version number and see which versions `pip` found.
2. If you're still getting errors on `sentence-transformers` (or its dependencies like `pytorch` or `GLIBCXX`), try a different environment. For example it may help to use the SageMaker image for `Pytorch 1.12 Python 3.8`.

In [None]:
%pip install --quiet beautifulsoup4==4.12.2 boto3==1.33.9 botocore==1.33.9 env_canada==0.6.1 emoji==2.12.1 \
feedparser==6.0.11 ipywidgets==7.6.5 jupyterlab_widgets==1.0.0 openai==0.27.7 \
pandas==1.3.4 s3fs==2024.2.0 seaborn==0.11.2 sendgrid==6.10.0 sentence-transformers==2.3.1 \
widgetsnbextension==3.5.1 yfinance==0.2.33 matplotlib

In [None]:
import asyncio
import base64
import boto3
from botocore.exceptions import ClientError
from bs4 import BeautifulSoup
import calendar
from copy import deepcopy
from datetime import date, datetime, timedelta
from dateutil.relativedelta import relativedelta
import emoji
from env_canada import ECWeather
import feedparser
from io import BytesIO, StringIO
from itertools import combinations
import json
import logging
from matplotlib import pyplot as plt
from matplotlib.text import Text
%config InlineBackend.figure_format = 'svg' # Makes plots higher quality
import nest_asyncio
nest_asyncio.apply() # Allows us to use async libraries like env_canada easily within the notebook using asyncio.run()
import numpy as np
import openai
import pandas as pd
import pytz
from random import choice
import requests
import s3fs
fs = s3fs.S3FileSystem()
import seaborn as sns
from sendgrid import Attachment, SendGridAPIClient
from sendgrid.helpers.mail import Mail
from sentence_transformers import SentenceTransformer
from sentence_transformers.util import cos_sim
from time import sleep
from tqdm.notebook import tqdm
import traceback
import yaml
import yfinance as yf

# Functions

## 📦 Load
Import data and initialize variables

### General assets

In [None]:
def init_logging(logging_level, dev_mode):
    """Initialize logging to either 
        * (default) in-memory object (for optional delivery in admin's issue of Finite News) or
        * (if dev_mode=True) a local log file
    
    NOTE
    Reminder: This function doesn't reset an active log. Must restart the kernel in SageMaker.
    
    ARGUMENTS
    logging_level (str): The granularity of logging messages, 'warning', 'info' or 'debug'. If dev_mode=True, forced to 'debug'
    dev_mode (bool): If False, we're in prod mode and logs will go to log_stream. If True, will send logs to local file
    
    RETURNS
    log_stream (StringIO object): If dev_mode=False, returns in-memory file-like object that collects results from logging during the Finite News run
    """
    
    if logging_level=='warning':
        level = logging.WARNING
    elif logging_level=='info':
        level = logging.INFO
    elif logging_level=='debug':
        level = logging.DEBUG

    if dev_mode:
        # Local file 
        logging.basicConfig(
            filename='app.log',
            level=level,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        return None
    else:
        # Create in-memory file-like object
        log_stream = StringIO() 
        logging.basicConfig(stream=log_stream, level=level)
        return log_stream


def get_fn_secret(secret_key, secret_name="fn_secrets", region_name="us-east-1"):
    """Retrieve a secret from AWS Secrets Manager.
    
    ARGUMENTS
    secret_key (string): the specific secret to retrieve, such as BUCKET_PATH or OPENAI_API_KEY
    secret_name (string): the group where the Finite News secrets are stored in AWS Secrets Manager
    region_name (string): the region where your AWS Secrets Manager secret_name lives. See the sample code provided by Secrets Manager after you create the secret

    RETURNS
    secret_value (string): the secret!
    """

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e: # Stop the presses, we can't get our secret.
        raise e

    # Decrypt secret using the associated KMS key.
    try:
        return json.loads(get_secret_value_response["SecretString"])[secret_key]
    except KeyError as e:
        raise KeyError(f"Secret key {str(e)} not found. Is it stored in AWS Secrets Manager? Have you given permissions for your SageMaker user to access the secret?") # No sense in logging the exception since we won't be sending any emails (where we store logs)
        

def load_s3(bucket_path, file_path, required=True):
    """Loads a file from S3 into Python variable
    
    ARGUMENTS
    bucket_path (str): The location of the S3 bucket where required files are stored.
    file_path (str): The name or path of the file
    required (bool): Should we error out if we can't load it?
    
    RETURNS
    variable
    """
    file_format = file_path.split(".")[-1]
    try:
        with fs.open(bucket_path + file_path, "r") as f:
            if file_format=="yml":
                variable = yaml.load(f, Loader=yaml.Loader)
            elif file_format=="htm" or file_format=="html":
                variable = f.read()
            elif file_format=="txt":
                variable = f.readlines()
            else:
                logging.warning(f"Unsupported file type in load_s3: {file_path}")
                return None
        logging.info(f"Read {file_path} from S3")
        return variable
                
    except Exception as e:
        error_message = f"Couldn't load {file_path} from S3. {str(type(e))}, {str(e)}"
        if required: 
            logging.critical(error_message)
            raise(e)
        logging.warning(error_message)
        return None


def load_publication_config(
    publication_config_file_name="publication_config.yml",
    dev_mode=False,
    disable_gpt=False
):
    """Import general settings and assets from files on S3, used for all subscribers
    
    ARGUMENTS
    publication_config_file_name (str): file name for the general publication parameters YML file in the S3 bucket identified by BUCKET_PATH
    dev_mode (bool): If True we're in development or debug mode, so don't send emails or modify headline_logs.
    disable_gpt (bool): If True, don't call the GPT API and incur costs, for example during dev or debug cycles.
    
    RETURNS
    publication_config (dict): General settings for all subscribers 
    """
    
    bucket_path = get_fn_secret("BUCKET_PATH")

    # Load publication settings
    publication_config = load_s3(bucket_path, publication_config_file_name)
    
    # Populate config dictionary, loading more assets as needed
    if publication_config["editorial"].get("enable_thoughts_of_the_day", False):
        thoughts_of_the_day = load_s3(bucket_path, "thoughts_of_the_day.yml", required=False)
        if thoughts_of_the_day: 
            thoughts_of_the_day = thoughts_of_the_day["quotes"]
    else:
        thoughts_of_the_day = []
    
    return {
        "bucket_path": bucket_path,
        "email_delivery": not dev_mode, # If dev_mode is True, don't send emails
        "sender": publication_config["sender"],
        "layout": {
            "template_html": load_s3(bucket_path, "template.htm", "r"),
            "logo_url": publication_config["layout"]["logo_url"],
        },
        "editorial": {
            "one_headline_keywords": publication_config["editorial"].get("one_headline_keywords", []),
            "substance_rules": load_s3(bucket_path, "substance_rules.yml"),
            "cache_issue_content": False if dev_mode else True,
            "gpt": publication_config["editorial"].get("gpt", None) if not disable_gpt else None,
            "smart_deduper": publication_config["editorial"].get("smart_deduper", None),
            "enable_thoughts_of_the_day": publication_config["editorial"].get("enable_thoughts_of_the_day", False)
        },
        "forecast" : publication_config.get("forecast", {}),
        "news_sources": publication_config["news_sources"],
        "events_sources": publication_config.get("events_sources", []),
        "alerts_sources": publication_config.get("alerts_sources", []),
        "image_sources": publication_config.get("image_sources", []),
        "thoughts_of_the_day": thoughts_of_the_day
    }

### Subscriber assets

In [None]:
def get_subscriber_list(bucket_path, folder_name="finite_files"):
    """Find the subscribers (the names of their config files) on the Finite News bucket.
    
    ARGUMENTS
    bucket_path (str): The location of the S3 bucket where required files are stored.
    folder_name (str): The part of the path that contains the folder on the bucket, if present. Used to remove from .
    
    NOTE: 
    1. Assumes the folder is at the root of the bucket. If it's nested, use relative path up to root.
    2. Assumes all files in the folder that begin with "config_" are a subscriber config file.
    
    RETURNS
    subscriber_config_file_names (list): yml file names in finite bucket
    """
    
    fn_bucket = (
        boto3
        .resource("s3")
        .Bucket(
            bucket_path
            .split("//")
            [1]
            .split("/")
            [0]
        )
    )
    # Iterate through files on the bucket and select those that begin with config_
    return [
        f.key.replace(f"{folder_name}/", "")
        for f in fn_bucket.objects.filter(Prefix=f"{folder_name}/")
        if f.key.startswith(f"{folder_name}/config_")
    ]


def filter_sources(sources, selections, criterion="name"):
    """Applies subscriber's selections to list of sources

    ARGUMENTS
    sources (list of dict): Descriptions of sources from publication_config
    selections (list of str): Names/Categories of sources that subscriber wants
    criterion (str): "name" or "category" for how to filter
    
    RETURNS
    sources_filtered (list of dict): Subset of sources that match subscriber's selections
    """
    if not selections:
        filtered_sources = []
    else:
        # Get the source details from publication config that were in susbcriber's selections
        # while keeping the order of subscriber's selections
        filtered_sources = sorted(
            [source for source in sources if source[criterion] in selections],
            key=lambda x: selections.index(x[criterion])
        )
    logging.info(f"Filtered out sources not in {selections}: {[source['name'] for source in sources if source[criterion] not in selections]}")
    return filtered_sources


def day_name_to_number(day_name):
    """Helper function to convert a named day like "Friday" to an ISO standard number like 4.
    
    ARGUMENTS
    day_name (str): Fully spelled out day of week. Case insensitive
    
    RETURNS
    day_number (int): Number from 0-6, where 0 = Monday
    """
    calendar.Calendar(firstweekday=0)
    return (
        {name: i for i, name in enumerate(calendar.day_name)}
        .get(day_name.capitalize(), None)
        + 1  # To align with isocalendar()
    )


def parse_frequency_config(frequency_config):
    """Determine if today is the day to deliver a scheduled section of the paper.
    
    NOTE
    * Reminder: When adding new frequencies, update get_stocks_plot()
    * Assumes the paper is delivered once per day. So "daily" config always returns True.
    
    ARGUMENTS
    frequency_config (dict): Parameters for a cycle
    
    RETURNS
    match (bool): Is today on the schedule?
    """
    
    if not frequency_config:
        logging.warning("Missing frequency config, assumed to be False")
        return False
    
    frequency = frequency_config.get("frequency", None) # The cadence label
        
    if frequency == "monthly":
        dom = frequency_config.get("day_of_month", 1) # Which day of the month does subscriber want?
        dom_today = date.today().day # What's the day of the month today?
        match = dom == dom_today
        logging.info(f"parse_frequency_config, result: {match}. Today: {dom_today}. Requested: {dom}")
        return match
    
    if frequency == "weekly":
        dow_number = day_name_to_number(frequency_config.get("day_of_week", "Monday"))
        _, today_dow_number = date.today().isocalendar()[1:] # Get today's "week of year" and "day of week" as integers using ISO standard
        match = today_dow_number==dow_number # Is today the requested day of the week?
        logging.info(f"parse_frequency_config, result: {match}. Today dow number: {today_dow_number}. Requested: {frequency_config.get('day_of_week')}, dow_number: {dow_number}")
        return match

    if frequency == "every_other_week":
        dow_number = day_name_to_number(frequency_config.get("day_of_week", "Monday"))
        eow_odd = frequency_config.get("eow_odd", False) # Should every other week fall on odd week numbers or even?
        week_number, today_dow_number = date.today().isocalendar()[1:] # Get today's "week of year" and "day of week" as integers using ISO standard
        week_number_match = (
                (eow_odd and week_number % 2 == 1)
                or (not eow_odd and week_number % 2 == 0)
        )
        match = (
            today_dow_number==dow_number # Today is the requested day of the week
            and week_number_match # This is the requested week
        )
        logging.info(f"parse_frequency_config, result: {match}. Today week_number, dow_number: {week_number, today_dow_number}. Requested: dow_number: {dow_number}, eow_odd: {eow_odd}")
        return match

    if frequency == "daily":
        logging.info(f"parse_frequency_config, result: True, because 'daily' is always True")
        return True

    if frequency == "weekdays":
        match = date.today().isoweekday()<6
        logging.info(f"parse_frequency_config, result: {match}, requested: weekeday (iso number <6), today: {date.today().isoweekday()}.")
        return match

    else:
        logging.warning(f"Unexpected value for frequency: {frequency}. Not parsed.")
        return False


def load_events_config(publication_events_sources, subscriber_sources):
    """Import the parameters for an events calendar source, if subscriber requests. 
    
    Includes deciding if today meets the subscriber's frequency for including events in their issue.
    
    ARGUMENTS
    publication_events_sources (list of dict): The source config for event-type sources in the publication, if present
    subscriber_sources (list of str): The subscriber's source configuration, which may or may not include preferences for event sources
    
    RETURNS
    event_sources (list of dict): Source configuration for events that 
    """
    
    try:
        subscriber_events_sources = subscriber_sources.get("events", {}).get("sources", [])
        frequency_match = parse_frequency_config(
            subscriber_sources.get("events", {}).get("frequency", {"frequency":"daily"})
        )
        if frequency_match and len(publication_events_sources)>0 and len(subscriber_events_sources)>0:
            return filter_sources(publication_events_sources, subscriber_events_sources)
        else:
            return []
    except Exception as e:
        logging.warning(f"Unhandled exception in load_events_config: {str(type(e))}, {str(e)}. publication_events_sources: {publication_events_sources}. subscriber_sources: {subscriber_sources}")
        return []
        
        
def load_stocks_config(subscriber_sources):
    """Import the parameters for subscriber's stock section, if any.

    ARGUMENTS
    subscriber_sources (list of str): The subscriber's source configuration, which may or may not include preferences for stock data
    
    RETURNS
    stocks (list of lists): Lists of tickers for each plot [ [TICKER1, TICKER2], [TICKER3, TICKER4] ], or empty list for none
    frequency (str): How often we are delivering this section. Used to determine how much history to put in plot
    """
    
    try:
        stocks_config = subscriber_sources.get("stocks", None)
        if not stocks_config:
            return [], None
        frequency_match = parse_frequency_config(stocks_config)
        frequency = stocks_config.get("frequency", None)
        ticker_sets = stocks_config.get("tickers", [])
        if len(ticker_sets)==0 or not frequency_match:
            return [], None
        return [[ticker.strip() for ticker in ticker_set.split(",")] for ticker_set in ticker_sets], frequency

    except Exception as e:
        logging.warning(f"Unhandled exception in load_stocks_confg: {str(type(e))}, {str(e)}. subscriber_sources: {subscriber_sources}")
        return [], None
    

def load_subscriber_config(subscriber_config_file_name, publication_config):
    """Import subscriber-specific parameters and combine with general publication settings
    
    ARGUMENTS
    subscriber_config_file_name (str): name of the subscriber's config YML file in the S3 bucket
    publication_config (dict): loaded general publication parameters
    
    RETURNS
    issue (dict): Settings for an issue, combining subscriber and general publication parameters
    """
    
    # Transfer general settings from publication config
    issue = deepcopy(publication_config) # Copy dict with nested dicts
    
    # Load subscriber's specific settings
    subscriber_config = load_s3(issue["bucket_path"], subscriber_config_file_name)
    
    # Check are we delivering this issue today?
    if not parse_frequency_config(
        subscriber_config.get("issue_frequency", {"frequency":"daily"})
    ):
        logging.info(f"{subscriber_config['email']}: No issue today, not in issue_frequency.")
        return None
    
    issue["admin"] = subscriber_config.get("admin", False)
    issue["sender"]["subject"] = subscriber_config["editorial"].get("subject", "Finite News")
    issue["subscriber_email"] = subscriber_config["email"]

    issue["editorial"]["add_car_talk_credit"] = subscriber_config["editorial"].get("add_car_talk_credit", False)
    issue["editorial"]["cache_path"] = subscriber_config.get("editorial", {}).get("cache_path", "")
    if issue["editorial"]["cache_path"] == "":
        logging.warning("No cache_path. Not logging new content or removing content already presented in last year.")
    else:
        # If cache file doesn't exist, create empty file
        if not fs.exists(issue["bucket_path"] + issue["editorial"]["cache_path"]):
            with fs.open(issue["bucket_path"] + issue["editorial"]["cache_path"], "wb") as f:
                f.write(b"")
    issue["requests_timeout"] = subscriber_config.get("editorial",{}).get("requests_timeout", 30)
    
    issue["news_sources"] = filter_sources(
        issue["news_sources"],
        subscriber_config.get("sources", {}).get("news_categories", []),
        "category"
    )
    issue["events_sources"] = load_events_config(publication_config["events_sources"], subscriber_config["sources"])
    issue["alerts_sources"] = filter_sources(
        issue["alerts_sources"],
        subscriber_config.get("sources", {}).get("alerts_sources", []),
        "name"
    )
    issue["alerts_sources"] += [
        {
            "name": "MBTA API: Alerts",
            "type": "mbta_alerts",
            "route": mbta_source.get("route", None),
            "stations": mbta_source.get("stations", []),
            "direction_id": mbta_source.get("direction_id", None),
        }
        for mbta_source in subscriber_config.get("sources",{}).get("mbta",[])
        if parse_frequency_config(mbta_source)
    ]
    issue["image_sources"] = filter_sources(
        issue["image_sources"],
        subscriber_config.get("sources", {}).get("image_categories", []),
        "category"
    )          
    issue["stocks"], issue["stocks_frequency"] = load_stocks_config(subscriber_config["sources"])
    issue["sports"] = subscriber_config.get("sports", {})
    issue["forecast"] = subscriber_config.get("forecast", {})
    if issue["forecast"]:
        issue["forecast"]["api_snooze_bar"] = publication_config["forecast"].get("api_snooze_bar", None)
        
    issue["slogans"] = subscriber_config["slogans"]
    if publication_config["editorial"]["enable_thoughts_of_the_day"]:
        issue["thoughts_of_the_day"] = subscriber_config.get("thoughts_of_the_day", [])
        if issue["thoughts_of_the_day"] and subscriber_config["editorial"].get("add_shared_thoughts", False):
            issue["thoughts_of_the_day"] += publication_config["thoughts_of_the_day"] 
    else: 
        issue["thoughts_of_the_day"] = []
    return issue


def load_subscriber_configs(dev_mode, disable_gpt):
    """Create the config file needed to generate each issue, combining publication and subscriber settings.
    
    ARGUMENTS
    dev_mode (bool): If True we're in development or debug mode, so don't send emails or modify headline_logs.
    disable_gpt (bool): If True, don't call the GPT API and incur costs, for example during dev or debug cycles.

    RETURNS
    subscriber_configs (list): issue_config for each subscriber we need to generate an issue for
    """ 
    
    publication_config = load_publication_config(dev_mode=dev_mode, disable_gpt=disable_gpt)    
    subscriber_list = get_subscriber_list(publication_config["bucket_path"])
    subscriber_configs = [
        load_subscriber_config(subscriber_config_file_name, publication_config)
        for subscriber_config_file_name in subscriber_list
    ]
    subscriber_configs = [c for c in subscriber_configs if c is not None] # Drop Nones, which occur if today is not in the issue_frequency for that subscriber
    
    # Sort subscribers so the "admins" go last. 
    # Allows the admin email issue(s) to include logging warnings from the non-admin issues.
    subscriber_configs = sorted(subscriber_configs , key=lambda x: x["admin"]) 
    return subscriber_configs

## 🕵🏻‍♀️ Report
Research the content for an issue

### General reporting

In [None]:
def dedup(l):
    """De-duplicate a list while preserving the order of elements, unlike list(set()).
    
    ARGUMENTS
    l (list): A list of items
    
    RETURNS
    l_dedup (list): The list in its original order, but without dups
    
    """
    seen = set()
    return [x for x in l if not (x in seen or seen.add(x))]


def heal_inner_n(s):
    """Replace one or more inner \n with a colon. 
    
    NOTES
    Assumes \n have been removed from ends
    
    ARGUMENTS
    s (str): A string with or without one or more \n in the middle
    
    RETURNS
    string with any \n in the middle replaced with a ": "
    """
    
    if "\n" in s:
        return s.split("\n")[0] + ": " + s.split("\n")[-1]
    return s


def create_calendar_sitemap_url(base_url, path_format, substract_one_day):
    """Generate a URL for sites that organize content chronologically in site-map.
    
    Useful for sites where there's no good way to ensure a page with headlines ordered by recency,
    where normal scraping would lead to old headlines reemerging"
        
    ARGUMENTS
    base_url (str): The core part of the URL that we'll add onto, e.g. "http://www.website.com/sitemap/"
    path_format (str): The format for the path pointing to the date we want to hit. e.g. "full_year/month_lower/day"
        Supported elements (in any order): full_year (2024), month_lower (august), month_title_case (August), day (5)
    subtract_one_day (bool): Whether to traverse to yesterday's date instead of today
    
    RETURNS
    string with fully specified url to the date desired, e.g. "http://www.website.com/sitemap/2024/august/5"
    """
    target_date = date.today() - timedelta(days=1) if substract_one_day else date.today()
    return (
        base_url
        + path_format
        
        # Replace supported 
        .replace("full_year", str(target_date.year))
        .replace("month_lower", target_date.strftime("%B").lower())
        .replace("month_title_case", target_date.strftime("%B").title()) # Initial-capitalized
        .replace("day", str(target_date.day))
    )


def scrape_source(source, requests_timeout, retry=True):
    """Fetch and parse the HTML tags from a web location.
        
    ARGUMENTS
    source (dict): Description of the website to scrape
    requests_timeout (int): Number of seconds to wait before giving up on an HTTP request
    retry (bool): Whether to run the request again if no items were scraped
    
    RETURNS
    items (list of str): Text retrieved
    
    """
    try:
        if "calendar_sitemap_format" in source:
            url = create_calendar_sitemap_url(source["url"], source.get("calendar_sitemap_format"), source.get("calendar_sitemap_subtract_one_day", False))
        else:
            url = source["url"]

        if source.get("specify_request_headers", False):
            headers = {
                'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36'
            }
        else:
            headers = None

        try:
            response = requests.get(url, headers=headers, timeout=requests_timeout)
        except requests.exceptions.SSLError as e:
            logging.warning(f"SSL error on {source['name']}, {url}. {str(type(e))}, {str(e)}")
            return []
        except requests.exceptions.Timeout as e:
            logging.warning(f"Request timed out after {requests_timeout} seconds: {url}. More details: {source['name']}, {str(type(e))}, {str(e)}")
            return []        
        except Exception as e:
            logging.warning(f"Requests error on {source['name']}, {url}. {str(type(e))}, {str(e)}")
            return []

        soup = BeautifulSoup(response.text, features=source.get("parser", "html.parser"))

        if "select_query" in source: # Scrape the content using a BeautifulSoup query
            items = soup.select(source["select_query"])
        elif "tag_class" in source: # Scrape the content by finding tags with a specific class
            items = soup.find_all(source["tag"], {"class":source["tag_class"]})
        elif "multitag_group" in source: # Scrape the content by finding repeating groups of tags and combine the text of each set of tags.
            groups = soup.find_all(source["multitag_group"])
            separator = source.get("multitag_separator", " ")
            for i, tag in enumerate(source["multitag_tags"]): # Iteratively append text from multiple consecutive tags into each string
                if i==0:
                    items = [f"{group.find(tag).text}" for group in groups]
                else:
                    try:
                        items = [f"{item_text}{separator}{group.find(tag).text}" for item_text, group in zip(items, groups)]
                    except Exception as e:
                        logging.warning(f"multitag error on {source['name']} while appending tag {tag}. Maybe tag not present for all items? {str(type(e))}, {str(e)}")
        else:
            items = soup.find_all(source["tag"])

        if "tag_next" in source: # Scrape a specified tag that appears _after_ "tag"
            items = items[0].findNext(source["tag_next"])

        if "detail_page_root" in source: # Scrape a child page
            # We have to go depper!
            detail_link = items[0].attrs["href"]
            header = items[0].get_text().strip()
            # Request a detail page
            response = requests.get(source["detail_page_root"] + detail_link, headers=headers, timeout=requests_timeout)
            soup = BeautifulSoup(response.text, features=source.get("parser", "html.parser"))

            # Get the detail image
            img_element = soup.find_all("img")[source["detail_img_number"]-1]
            alt = img_element.attrs['alt']
            src = img_element.attrs['src']
            # TODO: Make the following reuse code above
            if "detail_text_tag_class" in source:
                text = soup.find_all(source["detail_text_tag"], {"class":source["detail_text_tag_class"]})[0].get_text().strip()
            elif "detail_text_tag" in source:
                text = soup.find_all(source["detail_text_tag"])[0].get_text().strip()
            else:
                text=""

            if source.get("add_http_img", False):
                src = f"http:{src}"
            items = [f"""<h4>{header}</h4><img alt="{alt}" src="{src}"><p>{text}</p>"""]
        elif "split_char" in source:
            items = [item for item in items.get_text().split(source["split_char"]) if item]    
        elif "multitag_group" not in source: # multitag scraping at this stage already has text for each item. The other approaches need us to extract the text.
            items = [item.get_text() for item in items]

        # If at first you don't succeed...try just one more time
        # Some sources are finnicky and work better with two swings
        # But don't retry if this is a source we expect 0 results often
        if not items and retry and not source.get("exclude_from_0_results_warning", False):
            logging.info(f"No items scraped. Waiting 3 seconds and retrying.... {source['name']}")
            sleep(3)
            scrape_source(source, requests_timeout, retry=False)

        # Apply certain text cleaning that depends on source config
        # TODO: Move these to editing; keep items associated with their source config longer
        # Also because then user can apply these configs to API sources, not just scrapes

        # Check if certain phrases are present/absent
        if "must_contain" in source:
            # When it's a list, it's an OR
            if type(source["must_contain"])==list:
                items = [
                    h for h in items 
                    if sum([must_contain.lower() in h.lower() for must_contain in source["must_contain"]])>0
                        ]
            else:
                items = [h for h in items if source["must_contain"].lower() in h.lower()] 
        if "cant_contain" in source:
            if type(source["cant_contain"])==list:
                cant_contains = source["cant_contain"]
            else:
                cant_contains = [source["cant_contain"]]
            for cant_contain in cant_contains:
                items = [h for h in items if cant_contain.lower() not in h.lower()]

        # Clean text
        if "remove_text" in source:
            items = [h.replace(source["remove_text"],"") for h in items]

        # Remove \n and \t from ends of strings. Needed before heal_inner_n
        precleaning = True
        while precleaning:
            original_len = sum([len(h) for h in items])
            items = [h.strip("\r").strip("\n").strip("\t") for h in items]
            precleaning = (original_len != sum([len(h) for h in items]))

        # Clean strings with a "\n" in the middle
        if "heal_inner_n" in source:
            items = [heal_inner_n(item) for item in items]

        # Ensure each string is long enough.
        if "min_words" in source:
            items = [item for item in items if len(item.strip().split(" "))>=source["min_words"]] # simple way to count words

        return dedup(items)
    except Exception as e:
        logging.warning(f"Source failed on {source['name']}. {str(type(e))}, {str(e)}. Source: {source}")
        return []


def research_source(source, requests_timeout):
    """Get a source's content, whether from API or scraping, and format into desired structure.

    NOTE
    See also clean_headline() for post-processing that's done on the level of an individual headline.

    ARGUMENTS
    source (dict): Description of the API to call or website to scrape
    requests_timeout (int): Number of seconds to wait before giving up on an HTTP request

    RETURNS
    items (list of str): Content fround from source
    or
    html (str): Formatted block of html

    """
    try:
        # Get specialized content
        if source["type"] == "events_calendar":
            return get_calendar_events(source, requests_timeout)
        if source["type"]=="reminder":
            if parse_frequency_config(source.get("frequency", None)):
                return [source.get("reminder_message", None)]
            else:
                return []
        if source["type"]=="mbta_alerts":
            if not source["route"] or not source["stations"] or not source["direction_id"]:
                logging.warning(f"mbta_alert not checked. Expected route, stations, and direction_id. Found: {source}")
                return []
            return get_mbta_alerts(source["route"], source["stations"], source["direction_id"], requests_timeout)

        # Get general content
        if source["method"]=="api":
            response = requests.get(source["url"] + get_fn_secret(source["api_key_name"]), timeout=requests_timeout)
            items = [item[source["headline_field"]] for item in response.json()["results"]]
        elif source["method"]=="scrape":
            items = scrape_source(source, requests_timeout)
        elif source["method"]=="rss_images":
            if "get_img_tag_under_this_key" in source:
                html_blocks = [
                    entry[source["get_img_tag_under_this_key"]]
                    for entry in feedparser.parse(source["url"]).entries 
                ]  
                # Extract the first <img> tag in each html_block
                items = [f'''
                        <h4>{source.get("header","")}</h4>
                        {BeautifulSoup(html_block, "html.parser").find("img")}'''
                        for html_block in html_blocks
                ] 
                # Drop items that have no <img> component, only <h4>
                items = [item for item in items if "<img" in item]
            else:
                urls = [
                    entry["media_content"][0].get("url", None)
                    for entry in feedparser.parse(source["url"]).entries 
                    if "media_content" in entry
                ]   
                items = [f'''
                        <h4>{source.get("header","")}</h4>
                        <img src="{url}">'''
                        for url in urls
                ]
        elif source["method"]=="atom":
            newest_entry = feedparser.parse(source["url"]).entries[0]
            if "header_path" in source:
                header = newest_entry
                for node in source["header_path"]:
                    header = header[node]
                header = f"<h4>{source.get('header_preface', '')}{header}</h4>"
            else:
                header = ""
            if "image_path" in source:
                img = newest_entry
                for node in source["image_path"]:
                    img = img[node]
            else:
                img = ""
            if "body_path" in source:
                body = newest_entry
                for node in source["body_path"]:
                    body = body[node]
                body = f"<p>{body}</p>"
            else:
                body = ""
            items = [f"""{header}{img}{body}"""]
            
        # Lightly postprocess results
        if items:
            items = [item.replace("\n","").strip() for item in items if item]
            max_items = source.get("max_items", source.get("max_headlines", None)) # The attribute can have either name 
            items = items[0:max_items]
        # Log count
        if len(items)==0 and not source.get("exclude_from_0_results_warning", False): # Escalate to admin if no results were returned, and that was unexpected. Source's scraper/API may be broken.
            logging.warning(f"{source['name']}: retrieved 0 items") 
        else:
            logging.info(f"{source['name']}: retrieved {len(items)} items")

        # Add prefaces and return
        if source["type"] in ["headlines", "image_url"]:
            return [f"{source.get('preface','')}{item}" for item in items] # Add preface, if requested
        elif source["type"] == "alert_new":
            return [
                f"""{source.get('alert_preface', '')} <a href="{source['url']}" target="_blank">{item}</a>""" 
                for item in items
            ] # Wrap the alert in a URL. Add preface, if requested (add separately from regular 'preface', to isolate item)
        else:
            logging.warning(f"Unknown type of source {source['type']}: {str(source)}") 
            return []
    except Exception as e:
        logging.warning(f"Error getting content from source {source['name']}: {str(e)}")
        return []

def get_attributions(general_sources, sports_tracked, weather_source, stocks_used, car_talk_used):
    """Compile the names of all sources used in the issue, to give credit.
    
    ARGUMENTS
    general_sources (list of dict): A list of sources we tried to get news, alerts, etc from
    sports_tracked (dict): Sources we tried to get sports from
    weather_source (str): The forecast "source" attribute from subsciber's config, or None
    stocks_used (bool): True if we included stock data
    car_talk_used (bool): True if subscriber includes Car Talk credits
    
    RETURNS
    attributions (list of str): The names of the sources
    """
    attributions = list(set([source["name"] for source in general_sources])) # De-dups and sorts
    if "nba_teams" in sports_tracked: attributions += ["NBA API"]
    if "nhl_teams" in sports_tracked: attributions += ["NHL API"]
    if weather_source=="nws": attributions += ["National Weather Service API"]
    if weather_source=="env_canada": attributions += ["Environment Canada Weather API"]
    if stocks_used: attributions += ["Yahoo Finance API"]
    if car_talk_used: attributions += ["Car Talk credits"]
    return sorted(attributions)

### Sports

In [None]:
def get_todays_nba_game(team_name, requests_timeout):
    """Call the NBA API to find out if a team is playing today.
    
    NOTE
    This updated version accounts for the limitation of using the NBA API's current day's scoreboard: 
    the scoreboard isn't always updated until a certain hour in the morning, after FN may be run.
    The updated approach here looks at the whole year's schedule, including post-season. Adapted from : https://github.com/swar/nba_api/issues/296

    TODO: Clean and simpify. No need to use Pandas.
    
    ARGUMENTS:
    team_name (str): NBA team such as "Celtics" or "Lakers"
    requests_timeout (int): Number of seconds to wait before giving up on an HTTP request

    RETURNS
    message (str or None): A headline-style update if the team is playing tonight.
    """
    
    try:
        url = 'https://cdn.nba.com/static/json/staticData/scheduleLeagueV2.json'
        r = requests.get(url, timeout=requests_timeout)
        schedule = r.json()
        schedule = schedule['leagueSchedule']['gameDates']
        games = []
        for gameday in schedule:
            for game in gameday['games']:
                game_details = [
                        game['gameDateTimeUTC'],
                        game['homeTeam']['teamName'],
                        game['homeTeam']['teamCity'],
                        game['awayTeam']['teamName'],
                        game['awayTeam']['teamCity'],
                       ]
                game_details = pd.DataFrame(
                    [game_details],
                    columns =[
                        "gameDateTimeUTC",
                        "homeTeam",
                        "homeCity",
                        "awayTeam",
                        "awayCity",
                    ]
                )
                games.append(game_details)
        
        if not games: # A day with no games in the league
            return None

        games = pd.concat([game for game in games])
        

        eastern = pytz.timezone('US/Eastern')
        games['gameDateTimeUTC'] = pd.to_datetime(games['gameDateTimeUTC'], errors='coerce')
        games = games.dropna(subset=['gameDateTimeUTC'])
        games['gameDateTimeEastern'] = games['gameDateTimeUTC'].apply(lambda t: t.astimezone(eastern))
        games['gameDate'] = games['gameDateTimeEastern'].apply(lambda d: d.date())

        game = (
            games.loc[
                ((games['awayTeam'] == team_name) | (games['homeTeam'] == team_name))
                & (games['gameDate'] == datetime.today().date())]
        )
        if game.shape[0]==1:
            game = game.iloc[0]
            tipoff = game["gameDateTimeEastern"].strftime("%I:%M").lstrip("0").replace(":00","")
            if team_name in game["homeTeam"]:
                other_team = game["awayTeam"]
                return f"🏀 The {team_name} host the {other_team} at {tipoff}."
            else:
                other_city = game["homeCity"]
                return f"🏀 The {team_name} are in {other_city}. Tipoff at {tipoff}."
        else:
            return None
    except Exception as e:
        logging.warning(f"NBA game error for {team_name}: {str(type(e))}, {str(e)}")
        return None


def get_todays_nhl_game(team_place_name, requests_timeout):
    """Call the NHL API to find out if a team is playing today.
    
    TODO: Clean and simpify. No need to use Pandas.
    
    ARGUMENTS:
    team_place_name (str): the team's official named place, like Buffalo, Minnesota. For Montréal use the accented e. For New York, use team_place_name of Islanders or Rangers
    requests_timeout (int): Number of seconds to wait before giving up on an HTTP request

    RETURNS
    message (str or None): A headline-style update if the team is playing tonight.
    """
    
    try:
        url = "https://api-web.nhle.com/v1/schedule/" + date.today().strftime("%Y-%m-%d")
        r = requests.get(url, timeout=requests_timeout)
        schedule = r.json()['gameWeek'][0]['games']
        games = []
        for game in schedule:
            game_details = [
                    game['startTimeUTC'],
                    game['homeTeam']['placeName']['default'],
                    game['awayTeam']['placeName']['default'],
                   ]
            game_details = pd.DataFrame(
                [game_details],
                columns =[
                    "gameDateTimeUTC",
                    "home_place_name",
                    "away_place_name",
                ]
            )
            games.append(game_details)
        if not games: # A day with no games in the league
            return None
        games = pd.concat([game for game in games])
        eastern = pytz.timezone('US/Eastern')
        games['gameDateTimeUTC'] = pd.to_datetime(games['gameDateTimeUTC'], errors='coerce')
        games = games.dropna(subset=['gameDateTimeUTC'])
        games['gameDateTimeEastern'] = games['gameDateTimeUTC'].apply(lambda t: t.astimezone(eastern))
        games['gameDate'] = games['gameDateTimeEastern'].apply(lambda d: d.date())

        game = (
            games.loc[
                ((games['away_place_name'] == team_place_name) | (games['home_place_name'] == team_place_name))
                & (games['gameDate'] == datetime.today().date())]
        )
        if game.shape[0]==1:
            game = game.iloc[0]
            tipoff = game["gameDateTimeEastern"].strftime("%I:%M").lstrip("0").replace(":00","")
            if team_place_name in game["home_place_name"]:
                other_place_name = game["away_place_name"]
                if team_place_name in ["Islanders", "Rangers"]:
                    team_place_name = f"The {team_place_name} host"
                else:
                    team_place_name += " hosts"
                if other_place_name in ["Islanders", "Rangers"]:
                    other_place_name = f"the {other_place_name}"
                return f"🏒🥅 {team_place_name} {other_place_name}. They face off at {tipoff}."
            else:
                if team_place_name in ["Islanders", "Rangers"]:
                    team_place_name = f"The {team_place_name} are"
                else:
                    team_place_name += " skates"
                other_place_name = game["home_place_name"]
                if other_place_name in ["Islanders", "Rangers"]:
                    other_place_name = f"New York to face the {other_place_name}"
                return f"🏒🥅 {team_place_name} in {other_place_name}. The puck drops at {tipoff}."
        else:
            return None
    except Exception as e:
        logging.warning(f"NHL game error for {team_place_name}: {str(type(e))}, {str(e)}")
        return None

### Forecasts

In [None]:
def get_nws_forecast(nws_config):
    """Use National Weather Service API to get local forecast.
        
    NOTE: We use a fixed timeout for this API request, overriding publication_config's requests_timeout parameter.
    The value here is based on experience with NWS possibly needing a number of API attempts to get a response
    
    ARGUMENTS
    nws_config (dict): Parameters for calling the NWS API, including keys for:
        - office (str): Which NWS office to get the forecast from (See NOTE above)
        - grid_x (int), grid_y (int): Coordinates for the forecast (See NOTE above)
        - location_name (str): Optional, Town or city name (no state/country etc)
        - api_snooze_bar (int): How many seconds to wait before retrying NWS after an exception

    RETURNS
    forecast (dict or None): Attributes of the forecast retrieved, or None if there was a problem.
    """
    
    MAX_ATTEMPTS = 10 
    try:
        attempts=1
        while attempts<MAX_ATTEMPTS:
            url =f"https://api.weather.gov/gridpoints/{nws_config['office']}/{nws_config['grid_x']},{nws_config['grid_y']}/forecast"
            r = requests.get(url, timeout=5)
            if r.status_code==200:
                break
            else:
                attempts+=1
                logging.info(f"Weather request {r.status_code}. Wait {nws_config['api_snooze_bar']} seconds and retry, take # {attempts} ...")
                sleep(nws_config["api_snooze_bar"])
        
        # Get the next daytime forecast
        # Traverse the list of forecast periods to find the first that isn't Overnight, ~Tuesday Night, Tonight, Evening  
        daytime_forecasts = [
            period for period in r.json()["properties"]["periods"]
            if "night" not in period['name'].lower() 
            and "evening" not in period['name'].lower()
        ]
        if not daytime_forecasts: # No daytime forecasts found
            logging.warning(f"No NWS forecast added because no non-night/overnight period available. Config: {news_config}. Response from NWS: {r.json()}.")
            print("error")
            return None
        result = daytime_forecasts[0] # Get the daytime forecast that's coming first
        
        # Format forecast
        forecast = {
            "short": result.get("shortForecast", None),
            "detailed": result.get("detailedForecast", None),
            "icon_url": result.get("icon", None)
        }
        forecast["short"] = forecast["short"].capitalize() # Change from Title Case to Sentence case 
        if "location_name" in nws_config:
            forecast["short"] += f" in {nws_config['location_name']}"
        return forecast
    except Exception as e:
        try:
            logging.warning(f"Forecast error after {MAX_ATTEMPTS} attempts: {str(type(e))}, {str(e)}, {r}")
        except UnboundLocalError:
            logging.warning(f"Forecast error after {MAX_ATTEMPTS} attempts: {str(type(e))}, {str(e)}. requests.get() did not return a response r.")
        return None
    

def get_ca_forecast(forecast_config):
    """Use Environment Canada API to get local forecast.
        
    ARGUMENTS
    forecast_config (dict): Parameters for calling the env_canada API, including keys for:
        - lat (float), lon (float): Coordinates for the forecast 
        - location_name (str): Optional, Town or city name (no province etc)

    RETURNS
    forecast (dict or None): Attributes of the forecast retrieved, or None if there was a problem.
    """
    
    try:
        ec_en = ECWeather(coordinates=(forecast_config["lat"], forecast_config["lon"]))
        asyncio.run(ec_en.update())
        forecast_short, forecast_detailed = (
            ec_en
            .daily_forecasts
            [0]
            ["text_summary"]
            .split(".", maxsplit=1)
        )
        # If the first forecast returned is a day forecast, add the second forecast -- tonight
        # If the first forecast is a night forecast, don't add the second forecast. That's tomorrow's day forecast.
        if "night" not in ec_en.daily_forecasts[0]['period'].lower():
            forecast_detailed += f"\n\nTonight: {ec_en.daily_forecasts[1]['text_summary']}"            
        forecast = {
            "short": forecast_short,
            "detailed": forecast_detailed,
        }
        if "location_name" in forecast_config:
            forecast["short"] += f" in {forecast_config['location_name']}"
        return forecast
    except Exception as e:
        logging.warning(f"env_canada forecast error: {str(type(e))}, {str(e)}")
        return None


def get_gws_forecast(forecast_config):
    """Pull latest forecast from German Weather Service's Open Data. 
    
    
    ARGUMENTS
    forecast_config (dict): Parameters for getting GWS data, including keys for:
        - forecast_file (str): the name of the html file on "https://opendata.dwd.de/weather/text_forecasts/html/", the "LATEST" file for the desired region
        - location_name (str): Optional, Town or city name (no province etc)
        - api_timeout (int): Optional, Number of seconds to wait before giving up on a request

        
    RETURNS
    forecast (dict or None): Attributes of the forecast retrieved, or None if there was a problem.
    """
    
    try:
        forecast_config["url"] = f"https://opendata.dwd.de/weather/text_forecasts/html//{forecast_config['forecast_file']}"
        forecast_config["tag"] = "pre"
        
        forecast = {
            # Unlike NWS and env_canada, we don't have a brief forecast to use in the heading. :(
            "short": "Weather forecast",
            
            # But we got a big honking forecast! Just needs some cleaning
            "detailed": (
                scrape_source(forecast_config, forecast_config.get("api_timeout", 30))
                [0]
                .strip("\r\n")
                .replace("\r\n\r\n", "</p><p>")
                .replace("\r\n", " ")
            )
        }
        if "location_name" in forecast_config:
            forecast["short"] += f" for {forecast_config['location_name']}"
        return forecast

    except Exception as e:
        logging.warning(f"German Weather Service forecast error: {str(type(e))}, {str(e)}, {forecast_config}")
        return None
    
    
def get_forecast(forecast_config):
    """Use selected API to get weather forecast
    
    ARGUMENT
    forecast_config (dict): Parameters for calling the API, depending on "source"
    
    RETURNS
    forecast (dict or None): Attributes of the forecast retrieved, or None if there was a problem.
    """
    
    if forecast_config["source"] == "env_canada":
        return get_ca_forecast(forecast_config)
    elif forecast_config["source"]== "gws":
        return get_gws_forecast(forecast_config)
    elif forecast_config["source"] == "nws":
        return get_nws_forecast(forecast_config)
    else:
        logging.warning(f"Unexpected forecast source. No forecast added. {forecast_config}") 
        return None

### Stocks

In [None]:
def research_stock_history(ticker):
    """Retrieve the previous quarter of stock prices
    
    ARGUMENTS
    ticker (str): The abbreviation of the stock
    
    RETURNS
    stock_df (DataFrame): Previous month's stock prices
    """
    stock = yf.Ticker(ticker)
    
    # Get stock name
    # This is the series name that will be displayed in plot.
    stock_info = stock.info
    if "shortName" in stock_info:
        stock_name = stock.info["shortName"]
    elif "longName" in stock_info:
        stock_name = stock.info["longName"]
    else:
        stock_name = ticker
    if len(stock_name)<4: # If name is blank or unexpectedly short, use ticker
        stock_name = ticker

    # Get price series
    # By default we get the last quarter, the max we may need
    stock_df = (
        stock
        .history(period="3mo")
        .reset_index() #.reset_index()[["
        .assign(
            date = lambda df: df["Date"].dt.strftime("%m-%d"),
        )
        [["date", "Close"]]
    )
    stock_df[stock_name] = stock_df["Close"]
    
    return (
        stock_df
        .set_index(["date"])
        .drop(columns=["Close"])
    )


def research_stock_histories(tickers):
    """Get previous quarter prices for a list of stocks
    
    ARGUMENTS
    tickers (list of str): The abbreviation (ticker) of each stock  

    RETURNS
    stocks_df (DataFrame): Previous month's prices, with each stock as a column and mon-day (str) as index
    """
    
    stocks_l = [research_stock_history(ticker) for ticker in tickers]
    stocks_df = pd.concat(stocks_l, axis=1)
    stocks_df = stocks_df.loc[:, stocks_df.max().sort_values(ascending=False).index] # Sort biggest ticker first
    return stocks_df


def plot_stocks(stocks_df, history="quarter", dev_mode=False):
    """Create a plot for stock prices.
    
    ARGUMENTS
    stocks_df (DataFrame): Previous month's prices, with each stock as a column and mon-day (str) as index
    history (str): How long in the past to plot. "quarter", "month", "week"
    dev_mode (bool): If we're in dev/debug, output the plots to local files too.
    
    RETURNS
    png_b64 (str): The PNG image as base64

    """
    if history=="quarter":
        # Tick for every 30 days, and ensure we include last day. Set de-dups if necessary
        ticks = pd.Index(
            set(
                list(stocks_df.index[::30])[0:-1] + [stocks_df.index[-1]]
            )
        )
    elif history=="month":
        stocks_df = stocks_df.tail(30)
        # Tick for every week, and ensure we include last day. Set de-dups if necessary                  
        ticks = pd.Index(
            set(
                list(stocks_df.index[::7])[0:-1] + [stocks_df.index[-1]]
            )
        ) 
    elif history=="week":
        stocks_df = stocks_df.tail(7)
        # A tick for every day
        ticks = list(stocks_df.index) 
    else:
        logging.warning(f"Unexpected value of `history` in plot_stocks(): {history}")
        return None

    fig = plt.figure(figsize=(8,5))
    plt.style.use("dark_background")
    sns.lineplot(data=stocks_df, palette="husl", dashes=False, lw=4)
    sns.despine()
    plt.tight_layout()
    plot_max_y = stocks_df.iloc[:, 0].max() # Max of biggest ticker
    _ = plt.ylim(0, 1.2 * plot_max_y) # Max of biggest ticker + 20%

    # Plot tick marks
    plt.xticks(
        ticks, # Set() de-dups if necessary
        rotation=45,
        horizontalalignment='right',
        fontweight='light'
    )
    ax = plt.gca() # Get current axis

    # Add text labels
    for stock_i, stock_name in enumerate(stocks_df.columns):
        ticker_s = stocks_df[stock_name].dropna()

        # Add stock name
        ax.annotate(
            xy=(ticker_s.index[-1], ticker_s.iloc[-1]),
            xytext=(30,-5),
            textcoords='offset points',
            text=ticker_s.name, # The name of the Series = full name of stock, else ticker
            fontsize=20,
            color=ax.lines[stock_i].get_color(),
            ha='left',
        )
        
        # Add data labels
        for i in [0, -1]:
            ax.annotate(
                xy=(ticker_s.index[i], ticker_s.iloc[i]),
                xytext=(0, 20), # Place text 20 points above each data point
                textcoords='offset points',
                text=int(round(ticker_s.iloc[i],0)),
                fontsize=18 if i ==-1 else 14,
                color=ax.lines[stock_i].get_color(),
                ha='center',
                va='top'
            )

    plt.legend([],[], frameon=False) # Remove legend
    ax.set_xlabel(None) # Remove "Date" name of X axis
    
    # Get raw image
    png_bytes = BytesIO()
    plt.savefig(png_bytes, format = "png", bbox_inches='tight')
    png_bytes.seek(0)
    
    if dev_mode:
        plt.savefig(f"stocks_{'_'.join(stocks_df.columns)}.png", format = "png", bbox_inches='tight')

    plt.close(fig)
    del fig
    
    return base64.b64encode(png_bytes.read()).decode()


def get_stocks_plot(tickers, section_frequency="monthly", dev_mode=False):
    """Get on stocks data for the issue.
    
    ARGUMENTS
    tickers (list of str): The abbreviation (ticker) of each stock  
    section_frequency (str): How often we are reporting stocks in issue. Determines how far in the past to plot.
    dev_mode (bool): If we're in dev/debug, output the plots to local files too.

    RETURNS
    stocks_plot (base64): Image for a single plot of tickers
    """
    # Map how often we deliver this plot to how much historical data (context) to include in the graph
    if section_frequency == "monthly":
        history = "quarter"
    elif section_frequency in ["every_other_week"]:
        history = "month"
    elif section_frequency in ["daily", "weekdays", "weekly"]:
        history = "week"
    else:
        logging.warning(f"Unexpected section_frequency in get_stocks_plot(): {tickers}, {section_frequency}")
        return None
    
    stocks_df = research_stock_histories(tickers)
    return plot_stocks(stocks_df, history, dev_mode)

### Events

In [None]:
def extract_tag_class(element, soup, config):
    """Helper function to locate an HTML element's content by class and parse into a string.

    ARGUMENT
    element (str): Internal Finite News name of the element
    soup (BeautifulSoup object): The parsed HTML to search
    config (dict): The calendar_config dictionary describing the web calendar and how we'll process it 

    RETURNS
    element_str (str): The text of the desired element, if present in the soup
    """
    
    class_name = f"{element}_class"
    if class_name in config:
        return (
            soup
            .find(class_=config[class_name])
            .text
            .strip()
        )
    return ""


def extract_event_details(event_soup, calendar_config):
    """Parse an event description from HTML to structured data.
    
    ARGUMENTS
    event_soup (BeautifulSoup object): Parsed HTML for the event
    calendar_config (dict): Description of the website, calendar structure, and configuration

    RETURNS
    event (dict): Description of event with keys required for rendering in issue
    """
    
    event = {}
    
    # Extract text descriptions about the event
    for element in ["title", "venue", "dates", "description"]:
        event[element] = extract_tag_class(element, event_soup, calendar_config)
    
    # Extract thumbnail image
    if "image_html_class" in calendar_config:
        event["image_html"] = event_soup.find(class_=calendar_config["image_html_class"])
        if "placeholder_image_src" in calendar_config:
            if calendar_config["placeholder_image_src"] in event["image_html"].get("src", ""):
                event["image_html"] = calendar_config["placeholder_image_replacement_url"]
    else:
        event["image_html"] = ""

    # Extract link   
    if "link_url_class" in calendar_config and "link_url_child_key" in "calendar_config":
        event["link_url"] = (
            event_soup
            .find(class_=calendar_config["link_url_class"])
            .get(calendar_config["link_url_child_key"], "")
        )
    else:
        event["link_url"] = ""
        
    return event


def scrape_calendar_page(url_base, page, event_item_tag, event_list_class, requests_timeout):
    """Pull content from one page of a web calendar.
    
    ARGUMENTS
    url_base (str): The url for the calendar, with {PAGE} as a placeholder
    page (int): The page to request
    event_item_tag (str): The HTML tag where each event is stored
    event_list_class (str): The element CSS class for those event tags


    RETURNS
    page_soup (BeautifulSoup object): Parsed HTML for the calendar page
    """

    try:
        url = url_base.replace("{PAGE}", str(page))
        response = requests.get(url, timeout=requests_timeout)
        return (
            BeautifulSoup(response.text, "html.parser")
            .find_all(event_item_tag, class_=event_list_class)
        )
    except Exception as e:
        logging.warning(f"scrape_calendar_page: {str(type(e))}, {str(e)}. {url}")


def scrape_calendar(calendar_config, requests_timeout):
    """Pull content from a web calendar. Handle multi-page calendars.
    
    ARGUMENTS
    calendar_config (dict): Description of the website, calendar structure, and configuration
    requests_timeout (int): Number of seconds to wait before giving up on an HTTP request

    RETURNS
    calendar_events (lsit of dict): List of event descriptions
    """
    
    today = datetime.today() 
    start_date = today.strftime('%m-%d-%Y')
    end_date = (
        (today + timedelta(days=calendar_config["window"]))
        .strftime('%m-%d-%Y')
    )
    url_base = (
        calendar_config["url_base"]
        .replace("{START_DATE}", start_date)
        .replace("{END_DATE}", end_date)
    )

    exhausted = False
    calendar_events = []
    page = 1
    while True:
        page_soup = scrape_calendar_page(
            url_base,
            page,
            calendar_config["event_item_tag"],
            calendar_config["event_list_class"],
            requests_timeout
        )
        if page_soup:
            page_events = [extract_event_details(event_soup, calendar_config) for event_soup in page_soup]
            calendar_events.append(page_events)
            page += 1
        else:
            return [item for sublist in calendar_events for item in sublist] # FLatten nested list

        
def format_event(event):
    """Render one event as a table row
    
    ARGUMENT
    event (dict): Description of event
    
    RETURNS
    event_row (str): HTML table row describing that event
    """
    
    if len(event['title'])<2:
        return ''
    return f"""
    <tr>
       <td>
           {event['image_html']}
       </td>
       <td>
           <h4><a href="{event['link_url']}">{event['title']}</a></h4>
           <p><b>{event['venue']}</b></p>
           <p><b><i>{event['dates']}</b></i></p>
           <p>{event['description']}</p>
           <br>
        </td>
    </tr>
    """


def get_calendar_events(calendar_config, requests_timeout):
    """Pull all events from a website calendar, formatting results as HTML table.
    
    ARGUMENTS
    calendar_config (dict): Description of the website, calendar structure, and configuration
    requests_timeout (int): Number of seconds to wait before giving up on an HTTP request
    
    RETURNS
    calendar_html (str): List of events formatted as an HTML table
    """
    
    calendar_events = scrape_calendar(calendar_config, requests_timeout)

    # Limit total events if requested
    if calendar_config.get("max_events"):
        calendar_events = calendar_events[:min(calendar_config["max_events"], len(calendar_events))]
    return f"""
                <table>
                    {''.join([format_event(event) for event in calendar_events])}
                </table>
            """.replace("\n","")

### Misc sources


In [None]:
def get_mbta_alerts(route, station_ids, direction_id, requests_timeout):
    """Use the MBTA API to get alerts for a station.
    
    ARGUMENTS
    route (str): The mbta id of the route. Browse at https://api-v3.mbta.com/routes
    station_ids (list of str): The mbta ids of the station, either parent station ID from https://api-v3.mbta.com/stops, or get the end of the URL like https://www.mbta.com/stops/place-sstat
    direction_id (int): 0 for outbound, 1 for inbound
    requests_timeout (int): Number of seconds to wait before giving up on an HTTP request
    
    RETURNS
    alerts (list of str): Alerts for that station
    
    """
    if not route or not station_ids:
        return []
    url = f"https://api-v3.mbta.com/alerts?filter[route]={route}&filter[stop]={','.join(station_ids)}&filter[direction_id]={direction_id}"
    response = requests.get(url, timeout=requests_timeout)
    return [
        f"🚂 MBTA ruh-roh: {alert['attributes']['header'].strip()}"
        for alert in response.json()["data"]
    ]


def get_car_talk_credit(bucket_path):
    """Pull a random Car Talk credit from a CSV on S3. 
    
    NOTE
    - These credits are fake staff credits that were used at the end of each episode of
    the National Public Radio automotive advice radio show, Car Talk
    - They came from downloading https://www.cartalk.com/content/staff-credits.

    ARGUMENTS 
    bucket_path (str): The location of the S3 bucket where required files are stored.

    RETURNS
    car_talk_credit (str): A fake staff member to thank for creation of this issue of Finite News :D
    """
    
    return ": ".join(
        pd.read_csv(bucket_path + "car_talk_credits.csv", header=None)
        .sample(1)
        .values
        .flatten()
        .tolist()
    )


def get_screenshots(sources):
    """Not currently working on SM. Disabled."""
    return []
#     options = Options()
#     options.add_argument('headless')
#     s=Service(ChromeDriverManager().install())
#     driver = webdriver.Chrome(service=s, options=options)
#     driver.maximize_window()

#     screenshots = []
#     for source in sources:
#         url = source["url"]
#         driver.get(url)
#         try:
#             elements = driver.find_elements(By.CLASS_NAME, source["element_class"])
#             if source.get("automate_gradually", False):
#             # TODO: Temporary workaround for Birdcast. There's surely a better way
#                 b64_screenshots = [element.screenshot_as_base64 for element in elements]
#                 screenshot_b64 = b64_screenshots[source["element_number"]]
#             else:       
#                 # The simpler way that should work for nondynamically loaded images
#                 chart_element = elements[source["element_number"]]
#                 screenshot_b64 = chart_element.screenshot_as_base64
#         except Exception as e:
#             logging.warning(f"Selenium error on {source['url']}: {str(type(e))}, {str(e)}")
#         screenshots.append(screenshot_b64)
#         driver.quit()
#     return screenshots

## ✂️ Edit
Refine the news and other reporting results

In [None]:
def remove_emojis(text):
    """Utility function that removes all emojis from a string
    ARGUMENTS
    text (str): A string

    RETURNS
    text without emojis (or new whitespace created by removing emojis)

    """
    return (
        emoji.replace_emoji(text, replace="")
        .strip()
    )


def cache_issue_content(content, bucket_path, cache_path):
    """Export a list of this issue's headlines and other content that we don't want to show again in the next issue.
    
    NOTE: Must call this before edit_research so we carry forward repeats that were dropped too

    ARGUMENTS
    content (list of str): Headlines or other content items that shouldn't be repeated in subsequent issues
    bucket_path (str): The location of the S3 bucket where required files are stored.
    cache_path (str): The path on the S3 bucket for this subscriber's cache of last issue's headlines

    RETURNS
    None
    """
    
    with fs.open(bucket_path + cache_path, "w") as cache_file:
        for item in content:
            cache_file.write(f"{item}\n")
        logging.info(f"Wrote issue content to {bucket_path+cache_path}")
            

def apply_one_headline_keyword_filter(headlines, keyword):
    """Limit the issue to a maximum of one headline that mentions this keyword.

    ARGUMENTS
    headlines (list of str): Headlines from all sources

    RETURNS
    new_headlines (list of str): Headlines except those that contain this keyword
    """
    
    new_headlines = []
    kw_counter = 0
    keyword = keyword.lower()
    for headline in headlines:
        has_kw = keyword in headline.lower() # Could add spaCy tokenizer, split on spaces, punctuation. But the benefit would be teeny. Empirically this has been working perfectly for months.
        kw_counter += has_kw
        if not has_kw or kw_counter<=1:
            new_headlines.append(headline)
    return new_headlines


def remove_items_in_last_issue(new_items, bucket_path, cache_path):
    """Delete content that we already presented in the last issue.
    
    Ignores emojis in the comparison. That way a preface emoji (could be changed in publication_config) alone
    wouldn't prevent a match with an identical headline in the cache (with the old preface) 

    TODO
    Ignore the entire preface in this comparison. Even better, don't add the preface till after editing headlines.
    
    ARGUMENTS
    new_items (list of str): Fresh content
    bucket_path (str): The location of the S3 bucket where required files are stored.
    cache_path (str): The path on the S3 bucket for this subscriber's cache of the last issue's content
    
    RETURNS
    fresh_items (list of str): Content from new_items that was not in the last issue
    """
    last_issue_items = [remove_emojis(line) for line in load_s3(bucket_path, cache_path)]
    fresh_items = [item for item in new_items if remove_emojis(item) not in last_issue_items]
    logging.info(f"Removed items that were in last issue: {[item for item in new_items if item in last_issue_items]}") 
    return fresh_items


def unnest_list(l_of_ls):
    """Extract headlines from all sources we researched.
    
    ARGUMENTS
    l_of_ls (list of lists)

    RETURNS
    l (list of str): Flat list of headlines retrieved from all sources
    
    """
    l_of_ls_rinsed = [l for l in l_of_ls if l] # Remove sublists that are None
    return [item for sublist in l_of_ls_rinsed for item in sublist]


def lower_list(l):
    """Helper function to lowercase the items in a list of strings.
    
    ARGUMENTS
    l (list of str): A list of headlines
    
    RETURNS
    l_lower (list of str): A list of lowercase headlines
    """
    
    if not l:
        return None
    return [item.lower() for item in l]


def breaks_rule(headline, cant_begin_with, cant_contain, cant_end_with):
    """Evaluate whether a headline breaks any of the passed sets of editorial rules
    
    ARGUMENTS
    headline (str): The text to evaluate
    cant_begin_with (list of str): Text that a headline cannot start with
    cant_contain (list of str): Text that cannot exist anywhere in a headline
    cant_end_with (list of str): Text that a headline cannot end with
    
    RETURNS
    True if this headline violates any rule
    """
    
    # Ignore emojis, which often preface the headline (and interfere with cant_begin_with
    # TODO: Edit headlines before adding prefaces
    headline_clean = remove_emojis(headline)
    
    for phrase in cant_begin_with:
        if headline_clean.startswith(phrase):
            return True
    for phrase in cant_contain:
        if phrase in headline_clean:
            return True
    for phrase in cant_end_with:
        if headline_clean.endswith(phrase):
            return True

        
def apply_substance_rules(headlines, substance_rules):
    """Remove headlines that fail our logic for ensuring a headline is substanative.

    ARGUMENTS
    headlines (list of str): The headlines retrieved from all sources
    substance_rules (dict): The editorial rules, which consist of lists of phrases
    
    RETURNS
    kept_headlines (list of str): The headlines that pass all substrance rules.

    """
    cant_begin_with = lower_list(substance_rules.get("cant_begin_with", []))
    cant_contain = lower_list(substance_rules.get("cant_contain", []))
    cant_end_with = lower_list(substance_rules.get("cant_end_with", []))
    removed_headlines = [headline for headline in headlines if breaks_rule(headline.lower(), cant_begin_with, cant_contain, cant_end_with)]
    logging.info(f"Substance rules removed: {removed_headlines}")
    kept_headlines = [headline for headline in headlines if headline not in removed_headlines]
    return kept_headlines 


def smart_dedup(headlines, smart_dedup_config, prefaces_to_ignore=[]):
    """Use semantic de-duping to avoid showing two headlines about the same news events, even if they use different words.
    
    ARGUMENTS
    headlines (list of str): The headlines from research
    smart_dedup_config (dict): Publication's settings for using the smart deduplication
    prefaces_to_ignore (list): A list of repeated prefaces that may appear at beginnings of headlines, if we want smart_dedup to ignore them when computing headline similarity.
                               So we don't get high similarity just because two headlines start with "🍻 FiniteBrews: " for example.

    RETURNS
    deduped_headlines (list of str): Headlines after de-duplication
    """
    try:       
        # Set things up
        model = SentenceTransformer(smart_dedup_config["model"])
        # First, temporarily remove prefaces to headlines.
        # TODO: Refactor so that FiniteNews doesn't add prefaces until after editing. Then we can avoid this hokey pokey move.
        headlines_clean = headlines
        for preface in set(prefaces_to_ignore):
            headlines_clean = [h.replace(preface, "").strip() for h in headlines_clean]
        logging.info(f"Smart_deduper prefaces to ignore: {set(prefaces_to_ignore)}")
            
        # Second, find pairs of headlines that are semantically similar
        # We'll get their sentence embeddings and use cosine_similarity

        embeddings = model.encode(headlines_clean, convert_to_tensor=True)
        similarity_matrix = cos_sim(embeddings, embeddings)
        
        dups_found = [
            # Get every unique combination of headlines...
            [headlines[i], headlines[j]]
            for i in range(embeddings.shape[0])
            for j in range(embeddings.shape[0])
            # ...if their semanatic similarity meets threshold
            if similarity_matrix[i,j] >= smart_dedup_config["threshold"]
            # ...and a headlines isn't being compared to itself
            and i!=j
        ]

        if not dups_found:
            logging.info(f"Smart deduper: no semantic dups found")
            return headlines

        # Third, apply the transitive property of headline similarity! :D
        # Given pairs of headlines flagged as semantically similar, find the minimum set of unique items.
        # We assume if (headline A similar to headline B) and (B similar to C), we keep A and drop B and C
        keepers = {dups_found[0][0]} # Initialize by de-duplicating first pair of items. Keep first in pair, drop second
        droppers = {dups_found[0][1]}
        for pair in dups_found[1:]: # Then walk through the rest of the pairs
            # Have we already flagged at least one item in the pair as a keeper or a dropper?
            if set(pair).intersection(keepers.union(droppers)):
                # Find the unseen item(s) and drop it (them). 
                # By transitive property, it's similar to a seen item so we won't keep it.
                if pair[0] not in droppers and pair[0] not in keepers: 
                    droppers.add(pair[0])
                if pair[1] not in droppers and pair[1] not in keepers: 
                    droppers.add(pair[1])
            # Have we never seen either item in the new pair before?
            else:
                # Keep the first, drop the second
                keepers.add(pair[0])
                droppers.add(pair[1])

        # Finally, map the headlines to drop to the full headline including preface.
        droppers = [h for h in headlines for d in droppers if d in h] # Droppers won't change if prefaces_to_ignore is empty.
            
        logging.info(f"Smart dededuper found the following pairs of headlines that met threshold: {dups_found}")
        logging.info(f"Smart deduper kept: {keepers}. Removed: {droppers}")
        return [h for h in headlines if h not in droppers]
    
    except Exception as e:
        logging.warning(f"Smart deduper failed: {str(type(e))}, {str(e)}")
        return headlines

    
def openai_chat_completion(gpt_config, message):
    """Make an API call to the OpenAI GPT chat endpoint.
    
    ARGUMENTS
    gpt_config (dict): Parameters for using the API
    message (str): The full prompt to send GPT, including generic lead-in, headlines, and instruction (customized to each subscriber)
    
    RETURNS
    headlines_to_remove_str (string): GPT's response of which headlines to remove, in str format
    """
    
    response = openai.ChatCompletion.create(
        model=gpt_config["substance_filter_model"],
        messages=[
            {"role":"system", "content": gpt_config["system_role"]},
            {"role": "user", "content": message}
        ],
    )
    return response["choices"][0]["message"]["content"]


def apply_substance_filter_model(headlines, gpt_config):
    """Use LLM to remove headlines that don't say much useful.
    
    NOTE
    Requires an OPENAI_API_KEY in AWS Secrets Manager.
    
    ARGUMENTS
    headlines (list): List of string headlines, original candidates for the issue
    gpt_config (dict): Configuration for editing headlines using GPT LLM through the Open AI API.
    
    RETURNS
    kept_headlines (list): The headlines that GPT did not remove
    """
    
    GPT_RETRY_SLEEP = 30
    openai.api_key = get_fn_secret("OPENAI_API_KEY")
    headlines_for_gpt = [f"* {headline}" for headline in headlines]
    lead_in = "Here are today's news headlines:"
    message = lead_in + "\n" + "\n".join(headlines_for_gpt) + "\n" + gpt_config["instruction"]
    try:
        try:
            headlines_to_remove_str = openai_chat_completion(gpt_config, message)
        except openai.error.APIConnectionError:
            logging.info(f"OpenAI API error. Waiting {GPT_RETRY_SLEEP} secs, retrying...")
            sleep(GPT_RETRY_SLEEP)
            headlines_to_remove_str = openai_chat_completion(gpt_config, message)
            logging.info(f"OpenAI API error. Waiting {GPT_RETRY_SLEEP} secs, retrying...")
            logging.info("Retry worked! 😅")
    except Exception as e:
        logging.warning(f"OpenAI failed: {str(type(e))}, {str(e)}")
        headlines_to_remove_str = None

    headlines_to_remove = [h for h in headlines_to_remove_str.split("\n")]
    removed_headlines = [headline for headline in headlines if headline in headlines_to_remove] # Extra QC step to make sure GPT didn't return a hallucination that wasn't in headlines we sent it.
    logging.warning(f"GPT removed: {removed_headlines}") 
    return [headline for headline in headlines if headline not in removed_headlines]


def clean_headline(headline, enforce_trailing_period=True):
    """Standardize text formatting of a headline string
    
    NOTE
    - Assumes we have already stripped white space from beginning and end of headline
    - We apply these steps before applying substance rules, which rely on standard format,
    before checking if these headlines were in the last issue, and before caching this issue's headlines.

    ARGUMENTS
    headline (str): A single headline.
    enforce_trailing_period (bool): Whether to ensure headlines end in a period. True for news and alerts. False for image sections.
    
    RETURNS
    headline (str): A single, clean headline.
    """ 
    
    headline = (
        headline
        .replace("’","'") # Standardize apostrophe characters
        .replace("‘","'")
        .replace("\xa0", " ") # Non-breaking space unicode
    )
    if enforce_trailing_period:
        headline = headline + "." if not headline.endswith(".") and not (headline.endswith("?") or headline.endswith("!")) else headline  # Ensure all have trailing period
    return headline


def edit_headlines(raw_headlines,
                   issue_config,
                   filter_for_substance=True,
                   smart_deduplicate=True,
                   enforce_trailing_period=True,
                   sources_type="news_sources"):
    """Apply all editorial policies to the headlines.
    
    ARGUMENTS
    raw_headlines (list): List of string headlines, original candidates for the issue
    issue_config (dict): The settings for the issue
    filter_for_substance (bool): Apply rules ± LLM to remove non-substantive headlines
    smart_deduplicate (bool): When headlines have similar meaning, only keep one in the set.
    enforce_trailing_period (bool): Whether to ensure headlines end in a period. True for news and alerts. False for image sections.
    sources_type (str): The name of the key in issue_config to get the lists of sources for these headlines, so we can find their prefaces used.
    
    RETURNS
    edited_headlines (list): Headlines after filtering ones that violate editorial policies
    """
    
    if not raw_headlines:
        return raw_headlines
    # Apply deterministic cleaning first
    edited_headlines = remove_items_in_last_issue(raw_headlines, issue_config["bucket_path"], issue_config["editorial"]["cache_path"])
    edited_headlines = [clean_headline(headline, enforce_trailing_period) for headline in edited_headlines] # Do after removing repeats, since we cache the raw uncleaned
    for keyword in issue_config["editorial"]["one_headline_keywords"]:
        edited_headlines = apply_one_headline_keyword_filter(edited_headlines, keyword) # No list comprehension because each cycle can change edited_headlines
    if edited_headlines and filter_for_substance:
        edited_headlines = apply_substance_rules(edited_headlines, issue_config["editorial"]["substance_rules"])
    # Then probablistic LLM cleaning
    # Start with substance filter model. Remove ones that aren't great headlines.
        if issue_config["editorial"]["gpt"]:
            edited_headlines = apply_substance_filter_model(edited_headlines, issue_config["editorial"]["gpt"])
        else:
            logging.info("Did not apply LLM substance model. GPT not configured.")
    # Finally, smart deduplicate (by semantic similarity) the remaining headlines that are individually fine options.
    if edited_headlines and smart_deduplicate and issue_config["editorial"]["smart_deduper"]:
        prefaces_to_ignore = [source.get('preface', None) for source in issue_config[sources_type]]
        prefaces_to_ignore = [p for p in prefaces_to_ignore if p]
        edited_headlines = smart_dedup(
            edited_headlines,
            issue_config["editorial"]["smart_deduper"],
            prefaces_to_ignore
        )
    logging.info("Edited headlines: " + str(edited_headlines))
    return edited_headlines


def edit_sports_headlines(headlines, teams):
    """Clean and harmonize game headlines. The key outcome: when two of our tracked teams are playing each other, only report once, not twice.
    
    ARGUMENTS
    headlines (list of str): News related to today's game(s) for tracked teams
    teams (list of str): The names of the tracked teams that may be in the headlines.
    
    RETURNS
    cleaned_headlines (list of str): Harmonized news about today's game(s)
    """
    
    # Avoid [None] lists
    headlines = [h for h in headlines if h] 

    # If two tracked teams are playing each other, only give one headline
    cleaned_headlines = []
    teams_already_reported = set()
    for headline in headlines:
        teams_found = {t for t in teams if t in headline}
        if not teams_already_reported.intersection(teams_found):
            cleaned_headlines.append(headline)
        teams_already_reported.update(teams_found) 
    return cleaned_headlines

## 🎨 Design
Lay out the email

In [None]:
def get_weather_emoji(forecast):
    """
    Label a weather forecast with an emoji. It's used to spice up the section header.
    
    ARGUMENTS
    forecast (dict): Attributes of the forecast retrieved.
    
    RETURNS
    emoji (str): One character
    """

    forecast = forecast.lower()
    if "tornado" in forecast:
        return "🌪️"
    if "hurricane" in forecast:
        return "🌀"
    if "thunder" in forecast or "lightning" in forecast:
        return "⚡"
    if "snow" in forecast or "flurries" in forecast:
        return "❄️"
    if "rain" in forecast or "pour" in forecast or "shower" in forecast or "drizzle" in forecast: # Must come after snow for snow showers
        return "☔"
    if "hot" in forecast:
        return "🥵"
    if "freezing" in forecast:
        return "🥶"
    if "partly cloudy" in forecast or "mostly sunny" in forecast:
        return "🌤️"
    if "sunny" in forecast or "beautiful" in forecast or "warm" in forecast: # Must come after mostly sunny
        return "😎"
    if "mostly cloudy" in forecast:
        return "🌥️"
    if "cloudy" in forecast:
        return "☁️"
    if "windy" in forecast:
        return "🌬️"
    if "fog" in forecast or "smoke" in forecast:
        return "😶‍🌫️"
    return "🔮"


def populate_template(template_text, placeholder, new_content, html_list=None, condition=True):
    """Replaces placeholder text from a template with real content. 
    If there's no content, remove the placeholder.
    
    ARGUMENTS
    template_text (str): The text from a template we are filling in.
    placeholder (str): The characters in the template where content goes
    new_content (str): The content to add to the template, or None
    html_list (list): Optional, appends a list of items to end of `replacement` as html list. If this argument is passed, and list is empty, we remove placheolder altogether.
    condition (object, bool, or None): Optional, if False/None we replace placeholder with "" regardless of `new_content`. Usually this means some _piece_ of new_content must be non-Null, or we should remove the entire section
    
    RETURNS
    populated_text (str): The text with placeholder filled in or removed
    """
    # If placeholder doesn't exist in template, log that nothing will get populated below
    if placeholder not in template_text: 
        logging.warning(f"Template does not contain section so we cannot populate it. Placeholder: {placeholder}. Content to populate: {new_content}. Template text state: {template_text}")

    if not new_content or not condition: # This also checks if new_content is None (vs "")
        replacement = ""
    elif type(html_list)==list:     # If placeholder is populated by a list of items, insert that HTML into `replacement` string
        if len(html_list)>0:
            replacement = new_content + "<ul>" + ''.join([f'<li>{item}</li>' for item in html_list]) + "</ul>"
        else:
            replacement = ""
    else:
        replacement = new_content

    return template_text.replace(placeholder, replacement)


def format_issue(
    issue_config,
    content,
    log_stream=None
):
    """Organize the final content as HTML for one subscriber's issue.
    
    ARGUMENTS
    content (dict): The content to go into an issue, with these keys (although their values may be None/[]):
        - headlines (list of str): The final news headlines to be reported in this issue
        - forecast (dict): Forecast content, if any
        - events_html (str): HTML-formatted section with upcoming events
        - stock_plots (list of base64): List of pngs as base64
        - screenshots (list): Other images to attach to the image
    issue_config (dict): The settings for the issue
    log_stream (String IO): Optional, the log report from running Finite News
    
    RETURNS
    html (str): The Finite News template populated with the final content
    """
        
    html = issue_config["layout"]["template_html"]
    
    html = populate_template(html, "[[LOGO_URL]]", issue_config["layout"]["logo_url"])
    html = populate_template(html, "[[SLOGAN]]", choice(issue_config.get("slogans", [""])))
    html = populate_template(html, "[[HEADLINES_BLOCK]]", "<h3>🗞️ News</h3>", content["headlines"])
    html = populate_template(html, "[[ALERTS_BLOCK]]", "<h3>🚨 Alert weeoooweeooo</h3>", content["alerts"])

    if content["forecast"]:
        weather_emoji = get_weather_emoji(content["forecast"]["short"])
        weather_icon = f"<img src={content['forecast']['icon_url']} alt='Forecast icon'><br>" if "icon_url" in content["forecast"] else ""
        weather_block = f"<h3>{weather_emoji} {content['forecast']['short']}</h3>{weather_icon}<p>{content['forecast']['detailed']}</p>"
    else:
        weather_block = ""
    html = populate_template(html, "[[WEATHER_BLOCK]]", weather_block)

    html = populate_template(html, "[[EVENTS_BLOCK]]", f"<h3>🪩 Upcoming events</h3>{content['events_html']}", condition=content["events_html"])
    stocks_block = "".join([f"<img src='cid:image_{i}', alt='image_{i}'><br>" for i in range(0,len(content["stock_plots"]))]) # Reference cids of images attached to email
    html = populate_template(html, "[[STOCKS_BLOCK]]", f"<h3>💰 Financial update</h3>{stocks_block}", condition=stocks_block)
    images_block = "".join([f"<img src='cid:image_{i + len(content['stock_plots'])}', alt='image_{i + len(content['stock_plots'])}'><br>" for i in range(0,len(content['images']) - len(content['stock_plots']) )]) # Increment cids if stock plots already attached to email
    image_urls_block = ''.join(content["image_urls"])
    html = populate_template(html, "[[IMAGES_BLOCK]]", f"<h3>📸 Finstagram</h3>{images_block}{image_urls_block}", condition=images_block+image_urls_block)

    try:
        thoughts = issue_config["thoughts_of_the_day"]
        if len(thoughts)==0:
            thoughts = [None] # To make choice() happy, it can't handle []
        html = populate_template(html, "[[THOUGHT_OF_THE_DAY]]", f"""<h3>💭 Thought for the day</h3><p>{choice(thoughts)}</p>""", condition=len(issue_config["thoughts_of_the_day"])>0)
    except TypeError as e:
        logging.warning(f"TypeError on replace closing thoughts. Yaml malfored?: {e}. thoughts_of_the_day type: {type(issue_config['thoughts_of_the_day'])}. Expected string. {issue_config['thoughts_of_the_day']}")
        html = populate_template(html, "[[THOUGHT_OF_THE_DAY]]","")

    # Credits section
    car_talk_block = "<p>" + get_car_talk_credit(issue_config["bucket_path"]) + "</p><br>" if issue_config["editorial"]["add_car_talk_credit"] else "" # We need this car_talk_block variable later too
    html = populate_template(html, "[[CAR_TALK_CREDIT]]", car_talk_block)
    attributions=get_attributions(
        general_sources=issue_config["news_sources"] + issue_config["events_sources"] + issue_config["alerts_sources"] + issue_config["image_sources"] ,
        sports_tracked=issue_config["sports"],
        weather_source=issue_config["forecast"].get("source", None),
        stocks_used=len(content["stock_plots"])>0,
        car_talk_used=issue_config["editorial"]["add_car_talk_credit"]
    )
    html = populate_template(html, "[[ATTRIBUTIONS]]", "<p><i>Sources used: " + ", ".join(attributions) + "</i></p>", condition=attributions)
    html = populate_template(html, "[[CREDITS_INTRO]]", "<h3>💝 Credits</h3>" if (len(attributions) + len(car_talk_block)) > 0 else "") # No attribtions or Car Talk credits? No credits section at all!
    
    # Append logs to admin's email, if we're in prod mode
    log_items = [l for l in log_stream.getvalue().split("\n") if len(l)>0] if log_stream else []
    html = populate_template(html, "[[LOGGING_BLOCK]]", f"<h3>👾 Logs</h3>", log_items, condition=issue_config["admin"] is True and log_items)
    
    return html

## 📰 Publish
Orchestrate and deliver the news

In [None]:
def email_issue(sender, subscriber_email, html, images):
    """Send issue of Finite News to a subscriber by email using the SendGrid API service.
    
    NOTE
    Requires a secret in AWS Secret Manager for SENDGRID_API_KEY
    
    ARGUMENTS
    sender (dict): Metadata about the email source, with keys for "subject" and "email"
    subscriber_email (str): The email address for the destination
    html (str): The issue content
    images (list): Optional, png images to attach to the email
    
    RETURNS
    None
    """
    
    today = date.today().strftime("%m.%d.%y").lstrip("0")
    message = Mail(
        from_email=sender["email"],
        to_emails=subscriber_email,
        subject=f"{sender['subject']} for {today}",
        html_content=html
    )
    attachments = []
    for i, image in enumerate(images):
        attachedFile = Attachment(
            disposition='inline',
            file_name=f'image_{i}.png',
            file_type='image/png',
            file_content=image,
            content_id=f'image_{i}',
        )
        attachments.append(attachedFile)
    message.attachment = attachments
    try:
        sendgrid_key = get_fn_secret('SENDGRID_API_KEY')
        sg = SendGridAPIClient(sendgrid_key)
        response = sg.send(message)
        if response.status_code==202:
            logging.info(f"{subscriber_email}: Extry extry! Email is away!")
    except Exception as e:
        logging.critical(f"{subscriber_email}: Error in send_email: {str(type(e))}, {str(e)}") # Admin issue will get this logging line in its email about failures in prior, non-admin issues.


def deliver_issue(issue_config, html, images):
    """Send the content of Finite News to one subscriber by the selected method

    ARGUMENTS
    issue_config (dict): The settings for the issue
    html (str): The content of the email formatted for the email
    images (list): Optional, images to attach to the image
    
    RETURNS
    None
    """
    logging.info(f"{issue_config['subscriber_email']}: Starting deliver_issue()")
    if issue_config["email_delivery"]:
        email_issue(
            issue_config["sender"],
            issue_config["subscriber_email"],
            html,
            images
        )
    else: 
        # Write issue to file
        # Append issue's html to local .txt file that collect the day's issues. Creates file if it doesn't exist.
        with open(f"issues_for_{datetime.now().strftime('%m-%d-%y')}.txt", "a") as f:
            f.write(f"""{issue_config['subscriber_email']}\n{datetime.now().strftime('%m-%d-%y %H:%M:%S')}\n{html}\n--------------------------------------------\n""")
        logging.info(f"{issue_config['subscriber_email']}: Extry extry! Wrote to text file.")


def create_issue(issue_config, log_stream, dev_mode=False):   
    """Populate the content of Finite News customized for one subscriber
    
    ARGUMENTS
    issue_config (dict): The settings for the issue
    log_stream (StringIO object): In-memory file-like object that collects results from logging during the Finite News run
    dev_mode (bool): If we're in dev/debug, output plots to local files too.

    RETURNS
    html (str): The content of the email formatted for the email
    images (list): Optional, images to attach to the image
    """    
    
    logging.info(f"{issue_config['subscriber_email']}: Starting create_issue()")
        
    # Get news
    news_headlines = [research_source(source, issue_config["requests_timeout"]) for source in issue_config["news_sources"]]
    news_headlines = unnest_list(news_headlines)
    news_headlines = dedup(news_headlines) # Dedup at source and aggregate level here too, because sometimes we get same headline from multiple sources, like if we pull from multiple sections of the same site
    content_to_cache = news_headlines # Start collecting items we don't want to repeat in the next issue. Do before editing, which dedups using that cache.
    headlines = edit_headlines(news_headlines, issue_config)

    # Sports: Get tonight's games for tracked teams
    # Note: These are not added to content_to_cache, so they are not cached in cache_path file or de-duped from the last issue
    nba_headlines = [get_todays_nba_game(nba_team, issue_config["requests_timeout"]) for nba_team in issue_config["sports"].get("nba_teams", [])]
    nba_headlines = edit_sports_headlines(nba_headlines, issue_config["sports"].get("nba_teams", []))
    nhl_headlines = [get_todays_nhl_game(team_place_name, issue_config["requests_timeout"]) for team_place_name in issue_config["sports"].get("nhl_teams", [])]
    nhl_headlines = edit_sports_headlines(nhl_headlines, issue_config["sports"].get("nhl_teams", []))
    headlines = nba_headlines + nhl_headlines + headlines 

    # Get Alerts, a separate kind of headline that get edited more lightly
    alerts = [research_source(source, issue_config["requests_timeout"]) for source in issue_config["alerts_sources"]]
    alerts = unnest_list(alerts)
    content_to_cache += alerts # Cache alerts so we don't repeat them in the next issue. Do before editing, which checks that cache.
    alerts = edit_headlines(alerts, issue_config, filter_for_substance=False, smart_deduplicate=False) # Remove exact repeats, but don't try to remove non-substantive content or smart dedup (MBTA can have multiple, semantically similar alerts)

    if issue_config["forecast"]:
        forecast = get_forecast(issue_config["forecast"])
    else:
        forecast = None
        
    # Get Events section in HTML
    events_html = "".join(
            [research_source(source, issue_config["requests_timeout"]) for source in issue_config["events_sources"]]
    )

    # Get Stock plot images, if requested by subscriber 
    stock_plots = []
    if len(issue_config["stocks"])>0:
        for tickers_set in issue_config["stocks"]:
            stock_plots.append(get_stocks_plot(tickers_set, issue_config["stocks_frequency"], dev_mode))

    # Other images
    # A. images that we need to attach to the email
    images = stock_plots + get_screenshots([source for source in issue_config["image_sources"] if source["type"]=="screenshot"])

    # B. image_urls that we don't need to attach, the <img> html is sufficient
    image_urls = [research_source(source, issue_config["requests_timeout"]) for source in issue_config["image_sources"] if source["type"]=="image_url"]
    image_urls = unnest_list([element for element in image_urls if element])
    content_to_cache += image_urls
    image_urls = edit_headlines(
        image_urls,
        issue_config,
        filter_for_substance=False,
        smart_deduplicate=False,
        enforce_trailing_period=False
    ) # Don't show the image if it was in the last issue

    # Cache content, including unedited news headlines, sports headlines, alerts, and image_urls.
    # Do so with originals before removing repeats, cleaning, or applying substance filters.
        # That way, when checking cache for repeats, we compare unedited to unedited (same punctuation etc).
        # Also GPT filtering is nondeterministic. We need to remove repeats before GPT changes the pool.
    # But update the cache them _after_ edit_headlines(), which uses the cache to dedup and, at that point, needs to be the last issue's content, not this issue's.
    if issue_config["editorial"]["cache_issue_content"]: 
        cache_issue_content(
            content_to_cache,
            issue_config["bucket_path"],
            issue_config["editorial"]["cache_path"]
        )

    # Pull it all together
    html = format_issue(
        content = {
            "headlines": headlines,
            "alerts": alerts,
            "forecast": forecast,
            "events_html": events_html,
            "stock_plots": stock_plots,
            "images": images,
            "image_urls": image_urls
        },
        issue_config=issue_config,
        log_stream=log_stream
    )
    logging.info(f"{issue_config['subscriber_email']}: Finished create_issue()")
    return html, images

# Run

In [None]:
def run_finite_news(dev_mode, disable_gpt, logging_level):
    """Entry point to create and deliver issues to all subscribers of Finite News.

    ARGUMENTS
    dev_mode (bool): If True we're in development or debug mode, so don't send emails or modify headline_logs, and also output plots to local files.
    disable_gpt (bool): If True, don't call the GPT API and incur costs, for example during dev or debug cycles.
    logging_level (level from logging library): The deepest granularity of log messages to track
    
    RETURNS
    None
    """
    
    log_stream = init_logging(logging_level, dev_mode)
    for subscriber_config in tqdm(load_subscriber_configs(dev_mode, disable_gpt)):
        try:
            html, images = create_issue(subscriber_config, log_stream, dev_mode)
            deliver_issue(subscriber_config, html, images)
        except Exception as e:
            if dev_mode: # During dev or debugging, raise exception and show traceback in notebook.
                raise e
            logging.critical(f"{subscriber_config['subscriber_email']}: Issue failed due to unhandled exception. {traceback.format_exc()}") # In prod mode, save traceback for admin's issue, but continue to try to publish the next issue.
    print("👍")

In [None]:
run_finite_news(DEV_MODE, DISABLE_GPT, LOGGING_LEVEL)