In [16]:
import logging
import os
import re
import sys
import warnings
from io import BytesIO
from zipfile import ZipFile

import numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup
from tqdm.auto import tqdm

warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl")

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)


def get_fomc_urls(from_year=1999, switch_year=None):
    if switch_year is None:
        from datetime import datetime
        switch_year = datetime.now().year - 5
    
    calendar_url = "https://www.federalreserve.gov/monetarypolicy/fomccalendars.htm"
    r = requests.get(calendar_url)
    soup = BeautifulSoup(r.text, "html.parser")
    contents = soup.find_all(
        "a", href=re.compile("^/newsevents/pressreleases/monetary\\d{8}[ax].htm")
    )
    urls_ = [content.attrs["href"] for content in contents]

    for year in range(from_year, switch_year):
        fomc_yearly_url = (
            f"https://www.federalreserve.gov/monetarypolicy/fomchistorical{year}.htm"
        )
        r_year = requests.get(fomc_yearly_url)
        soup_yearly = BeautifulSoup(r_year.text, "html.parser")
        yearly_contents = soup_yearly.findAll("a", text="Statement")
        minutes=soup_yearly.find_all("p", text=re.compile("Minutes"))
        urls_.extend(
            [a.attrs["href"] for p in yearly_contents for a in p.find_all("a", text="HTML")]
        )
        print(minutes)
        for minute in minutes:
            htmls = minute.find_all("a", text="HTML")
            print(htmls)

    urls = ["https://www.federalreserve.gov" + url for url in urls_]
    return urls


def sent_cleaner(s):
    return s.replace("\n", " ").replace("\r", " ").replace("\t", " ").strip()


def bs_cleaner(bs, html_tag_blocked=None):
    if html_tag_blocked is None:
        html_tag_blocked = [
            "style", "script", "[document]", "meta", "a", "span", "label", 
            "strong", "button", "li", "h6", "font", "h1", "h2", "h3", "h5", 
            "h4", "em", "body", "head", "sup",
        ]
    return [
        sent_cleaner(t)
        for t in bs.find_all(text=True)
        if (t.parent.name not in html_tag_blocked) & (len(sent_cleaner(t)) > 0)
    ]


regexp = re.compile(r"\s+", re.UNICODE)

def feature_extraction(corpus, sent_filters=None):
    if sent_filters is None:
        sent_filters = [
            "Board of Governors", "Federal Reserve System",
            "20th Street and Constitution Avenue N.W., Washington, DC 20551",
            "Federal Reserve Board - Federal Reserve issues FOMC statement",
            "For immediate release", "DO NOT REMOVE:  Wireless Generation",
            "For media inquiries", "or call 202-452-2955.", "Voting",
            "For release at", "Last Update", "Last update",
        ]

    text = [
        " ".join(
            [
                regexp.sub(" ", s)
                for i, s in enumerate(c)
                if (i > 1) & np.all([q not in s for q in sent_filters])
            ]
        )
        for c in corpus
    ]

    release_date = [pd.to_datetime(c[1].replace("Release Date: ", "")) for c in corpus]
    last_update = [
        pd.to_datetime(
            [
                s.replace("Last update:", "").replace("Last Update:", "").strip()
                for s in c
                if "last update: " in s.lower()
            ][0]
        )
        for c in corpus
    ]
    voting = [" ".join([s for s in c if "Voting" in s]) for c in corpus]
    release_time = [
        " ".join(
            [s for s in c if ("For release at" in s) | ("For immediate release" in s)]
        )
        for c in corpus
    ]

    return pd.DataFrame(
        {
            "release_date": release_date,
            "last_update": last_update,
            "text": text,
            "voting": voting,
            "release_time": release_time,
        }
    )


def load_fomc_statements(
    add_url=True, cache_dir="data", force_reload=False, progress_bar=False, from_year=1999
):
    filename = os.path.join(cache_dir, "fomc_statements.parquet")
    if os.path.exists(filename) & (~force_reload):
        logger.info(f"logging from cache file: {filename}")
        statements = pd.read_parquet(filename)
    else:
        logger.info("loading from external source")
        urls = get_fomc_urls(from_year=from_year)
        if progress_bar:
            urls_ = tqdm(urls)
        else:
            urls_ = urls
        corpus = [
            bs_cleaner(BeautifulSoup(requests.get(url).text, "html.parser"))
            for url in urls_
        ]
        statements = feature_extraction(corpus).set_index("release_date")
        if add_url:
            statements = statements.assign(url=urls)
        statements = statements.sort_index()
        logger.info(f"saving cache file {filename}")
        statements.to_parquet(filename)
    return statements


In [3]:
statements=load_fomc_statements(progress_bar=True)

INFO:__main__:loading from external source


  0%|          | 0/219 [00:00<?, ?it/s]

INFO:__main__:saving cache file data/fomc_statements.parquet


In [10]:
statements.info(
)
statements['text'].values[0]

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 219 entries, 1999-05-18 to 2024-11-07
Data columns (total 5 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   last_update   219 non-null    datetime64[ns]
 1   text          219 non-null    object        
 2   voting        219 non-null    object        
 3   release_time  219 non-null    object        
 4   url           219 non-null    object        
dtypes: datetime64[ns](1), object(4)
memory usage: 10.3+ KB


"The Federal Reserve released the following statement after today's Federal Open Market Committee meeting: While the FOMC did not take action today to alter the stance of monetary policy, the Committee was concerned about the potential for a buildup of inflationary imbalances that could undermine the favorable performance of the economy and therefore adopted a directive that is tilted toward the possibility of a firming in the stance of monetary policy. Trend increases in costs and core prices have generally remained quite subdued. But domestic financial markets have recovered and foreign economic prospects have improved since the easing of monetary policy last fall. Against the background of already-tight domestic labor markets and ongoing strength in demand in excess of productivity gains, the Committee recognizes the need to be alert to developments over coming months that might indicate that financial conditions may no longer be consistent with containing inflation."

In [17]:
get_fomc_urls()

[]
[]
[]
[]
[]
[]
[]
[]
[]
[]


KeyboardInterrupt: 

In [13]:
import os
from mistralai import Mistral
from skfin.datasets import load_fomc_statements
from tqdm import tqdm

with open("../mistral_key.txt", "r") as f:
    api_key = f.read().strip()
def analyze_monetary_policy_hawkishness(api_key, statements):
    model = "mistral-large-latest"
    client = Mistral(api_key=api_key)

    results = []

    for text in tqdm(statements):
        chat_response = client.chat.complete(
            model=model,
            messages=[
                {
                    "role": "user",
                    "content": f"Act as a financial analyst. What is the monetary policy hawkishness of this text? \
    Please choose an answer from hawkish, dovish, neutral or unknown and provide a probability and a short explanation. \
        answer in this structure (no other text) : \n \
        label: hawkish, \n probability: 90%, \n explanation: The text contains a lot of positive words and is likely to be hawkish. \n \
    Text: {text}",
                },
            ]
        )

        response_message = chat_response.choices[0].message.content
        # Assuming the response message contains the label and probability in a specific format
        # You may need to adjust the parsing based on the actual response format
        # label = response_message.split(':')[1].split(',')[0].strip()
        # probability = response_message.split('probability')[1].split('%')[0].strip()

        results.append({ "text": response_message})

    return results

# statements = load_fomc_statements(force_reload=False, cache_dir="../nbs/data")

# Example usage:
""
# print(len(statements))

results = analyze_monetary_policy_hawkishness(api_key, statements)
# for i,result in enumerate(results):
#     print(f" i: {i} \n {result['text']} \n")
#     print('---'*20)

  0%|          | 0/219 [00:00<?, ?it/s]

INFO:httpx:HTTP Request: POST https://api.mistral.ai/v1/chat/completions "HTTP/1.1 200 OK"


  0%|          | 1/219 [00:03<12:58,  3.57s/it]

INFO:httpx:HTTP Request: POST https://api.mistral.ai/v1/chat/completions "HTTP/1.1 200 OK"


  1%|          | 2/219 [00:04<07:21,  2.04s/it]

INFO:httpx:HTTP Request: POST https://api.mistral.ai/v1/chat/completions "HTTP/1.1 429 Too Many Requests"


  1%|          | 2/219 [00:04<08:46,  2.43s/it]


SDKError: API error occurred: Status 429
{"message":"Requests rate limit exceeded"}