<a href="https://colab.research.google.com/github/brandonjloeffler-lab/Semester-Project/blob/main/collect_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import requests
import pandas as pd
import os
import time
from datetime import datetime

BLS_API_KEY = os.getenv("BLS_API_KEY", "88317efad228417fb8b93e6d0796cb8e")
BLS_API_URL = "https://api.bls.gov/publicAPI/v2/timeseries/data/"

SERIES_MAP = {
    "LNS14000000": "Unemployment_Rate_SA",
    "CES0000000001": "Total_Nonfarm_Employment_SA",
    "CES0500000003": "Avg_Weekly_Hours_Private_SA",
    "PRS85006092": "Output_Per_Hour_NF",
    "CUUR0000SA0L1E": "CPI_U_Ex_Food_Energy_U",
    "EIUIR": "Imports_All_Commodities_U",
    "EIUIQ": "Exports_All_Commodities_U",
}


END_YEAR = datetime.now().year
START_YEAR = END_YEAR - 5
DATA_FILE_PATH = "data/bls_data.csv"

def get_bls_data(series_ids, start_year, end_year):
    """Fetches data from the BLS API for the given series and date range."""

    headers = {'Content-type': 'application/json'}
    data = {
        "seriesid": series_ids,
        "startyear": str(start_year),
        "endyear": str(end_year),
        "registrationkey": BLS_API_KEY,
        "catalog": False,
        "calculations": False,
        "annualaverage": False
    }

    try:
        response = requests.post(BLS_API_URL, headers=headers, json=data)
        response.raise_for_status()
        json_data = response.json()

        if json_data['status'].strip() == 'REQUEST_SUCCEEDED':
            return json_data['Results']['series']
        else:
            print(f"BLS API Error: {json_data}")
            return None
    except requests.exceptions.RequestException as e:
        print(f"An error occurred during API request: {e}")
        return None

def process_data(series_results):

    processed_data = {}

    for series in series_results:
        series_id = series['seriesID']
        column_name = SERIES_MAP.get(series_id, series_id)

        for item in series['data']:
            year = item['year']
            period = item['period']
            value = item['value']

            date_str = None
            if period.startswith('M'):
                month = int(period[1:])
                date_str = f"{year}-{month:02d}-01"
            elif period.startswith('Q'):
                # Map quarterly data to the last month of the quarter
                quarter_map = {'Q01': 3, 'Q02': 6, 'Q03': 9, 'Q04': 12}
                month = quarter_map.get(period, None)
                if month is not None:
                    date_str = f"{year}-{month:02d}-01"

            if date_str:
                if date_str not in processed_data:
                    processed_data[date_str] = {}

                try:
                    processed_data[date_str][column_name] = float(value)
                except ValueError:
                    processed_data[date_str][column_name] = None

    df = pd.DataFrame.from_dict(processed_data, orient='index')
    df.index.name = 'Date'
    df = df.reset_index()
    df['Date'] = pd.to_datetime(df['Date'])

    df = df.sort_values(by='Date').reset_index(drop=True)
    return df

def initial_data_collection():
    """Initial function to fetch and save historical data."""
    print(f"Starting initial data collection from {START_YEAR} to {END_YEAR}...")

    series_ids = list(SERIES_MAP.keys())
    series_data = get_bls_data(series_ids, START_YEAR, END_YEAR)

    if series_data:
        df_final = process_data(series_data)

        os.makedirs(os.path.dirname(DATA_FILE_PATH), exist_ok=True)

        df_final.to_csv(DATA_FILE_PATH, index=False)
        print(f"Successfully collected {len(df_final)} historical records and saved to {DATA_FILE_PATH}")
    else:
        print("Initial data collection failed.")

if __name__ == "__main__":
    initial_data_collection()


Starting initial data collection from 2020 to 2025...
Successfully collected 69 historical records and saved to data/bls_data.csv
