# Consumer Price Index (CPI) – Inflation



In [1]:
import requests
import pandas as pd

bls_url = "https://api.bls.gov/publicAPI/v2/timeseries/data/"

series_ids = ["CUUR0000SA0"]

payload = {
    "seriesid": series_ids,
    "startyear": "2015",
    "endyear": "2024"
}

response = requests.post(bls_url, json=payload, timeout=30)
response.raise_for_status()
data = response.json()

rows = []
for series in data["Results"]["series"]:
    for item in series["data"]:
        rows.append({
            "year": int(item["year"]),
            "period": item["period"],          # M01–M12
            "month": item["periodName"],
            "cpi_value": float(item["value"])
        })

cpi_df = pd.DataFrame(rows)

print("CPI rows:", len(cpi_df))
cpi_df.head(10)


CPI rows: 120


Unnamed: 0,year,period,month,cpi_value
0,2024,M12,December,315.605
1,2024,M11,November,315.493
2,2024,M10,October,315.664
3,2024,M09,September,315.301
4,2024,M08,August,314.796
5,2024,M07,July,314.54
6,2024,M06,June,314.175
7,2024,M05,May,314.069
8,2024,M04,April,313.548
9,2024,M03,March,312.332


# Current Employment Statistics (CES) – Employment by industry



In [2]:
import requests
import pandas as pd

bls_url = "https://api.bls.gov/publicAPI/v2/timeseries/data/"

series_map = {
    "CES3000000001": "Total Nonfarm Employment",        # MD Total Nonfarm
    "CES3000000006": "Manufacturing Employment",       # MD Manufacturing
    "CES3000000007": "Leisure and Hospitality Employment"  # MD Leisure
}

payload = {
    "seriesid": list(series_map.keys()),
    "startyear": "2015",
    "endyear": "2024"
}

response = requests.post(bls_url, json=payload, timeout=30)
response.raise_for_status()
data = response.json()

rows = []
for series in data["Results"]["series"]:
    series_id = series["seriesID"]
    industry = series_map.get(series_id, "Unknown")

    for item in series["data"]:
        rows.append({
            "series_id": series_id,
            "industry": industry,
            "year": int(item["year"]),
            "period": item["period"],           # M01–M12
            "month": item["periodName"],
            "employment_level": float(item["value"])
        })

ces_df = pd.DataFrame(rows)

print("CES rows:", len(ces_df))
ces_df.head(10)


CES rows: 360


Unnamed: 0,series_id,industry,year,period,month,employment_level
0,CES3000000001,Total Nonfarm Employment,2024,M12,December,12760.0
1,CES3000000001,Total Nonfarm Employment,2024,M11,November,12770.0
2,CES3000000001,Total Nonfarm Employment,2024,M10,October,12750.0
3,CES3000000001,Total Nonfarm Employment,2024,M09,September,12800.0
4,CES3000000001,Total Nonfarm Employment,2024,M08,August,12800.0
5,CES3000000001,Total Nonfarm Employment,2024,M07,July,12840.0
6,CES3000000001,Total Nonfarm Employment,2024,M06,June,12839.0
7,CES3000000001,Total Nonfarm Employment,2024,M05,May,12849.0
8,CES3000000001,Total Nonfarm Employment,2024,M04,April,12847.0
9,CES3000000001,Total Nonfarm Employment,2024,M03,March,12838.0


# Local Area Unemployment Statistics (LAUS) – Unemployment and labor force

In [3]:
import requests
import pandas as pd

bls_url = "https://api.bls.gov/publicAPI/v2/timeseries/data/"

series_map = {
    "LASST240000000000003": "Unemployment Rate",
    "LASST240000000000004": "Labor Force",
    "LASST240000000000005": "Employment"
}

payload = {
    "seriesid": list(series_map.keys()),
    "startyear": "2015",
    "endyear": "2024"
}

response = requests.post(bls_url, json=payload, timeout=30)
response.raise_for_status()
data = response.json()

rows = []
for series in data["Results"]["series"]:
    series_id = series["seriesID"]
    metric = series_map.get(series_id, "Unknown")

    for item in series["data"]:
        rows.append({
            "series_id": series_id,
            "metric": metric,
            "year": int(item["year"]),
            "period": item["period"],           # M01–M12
            "month": item["periodName"],
            "value": float(item["value"])
        })

laus_df = pd.DataFrame(rows)

print("LAUS rows:", len(laus_df))
laus_df.head(10)


LAUS rows: 360


Unnamed: 0,series_id,metric,year,period,month,value
0,LASST240000000000003,Unemployment Rate,2024,M12,December,3.1
1,LASST240000000000003,Unemployment Rate,2024,M11,November,3.1
2,LASST240000000000003,Unemployment Rate,2024,M10,October,3.2
3,LASST240000000000003,Unemployment Rate,2024,M09,September,3.2
4,LASST240000000000003,Unemployment Rate,2024,M08,August,3.2
5,LASST240000000000003,Unemployment Rate,2024,M07,July,3.2
6,LASST240000000000003,Unemployment Rate,2024,M06,June,3.1
7,LASST240000000000003,Unemployment Rate,2024,M05,May,3.0
8,LASST240000000000003,Unemployment Rate,2024,M04,April,2.9
9,LASST240000000000003,Unemployment Rate,2024,M03,March,2.9


# Data Transformation and Schema Alignment


In [4]:
# ---- CPI Transformation ----
cpi_df = cpi_df.copy()

cpi_df["year"] = cpi_df["year"].astype(int)
cpi_df["cpi_value"] = cpi_df["cpi_value"].astype(float)

cpi_df = cpi_df[["year", "period", "month", "cpi_value"]]


# ---- CES Transformation (Employment) ----
ces_df = ces_df.copy()

ces_df["year"] = ces_df["year"].astype(int)
ces_df["employment_level"] = ces_df["employment_level"].astype(float)

ces_df = ces_df[[
    "industry",
    "year",
    "period",
    "month",
    "employment_level"
]]


# ---- LAUS Transformation (Unemployment) ----
laus_df = laus_df.copy()

laus_df["year"] = laus_df["year"].astype(int)
laus_df["value"] = laus_df["value"].astype(float)

laus_df = laus_df.rename(columns={"value": "unemployment_rate"})

laus_df = laus_df[[
    "year",
    "period",
    "month",
    "unemployment_rate"
]]



display(cpi_df.head(), ces_df.head(), laus_df.head())


Unnamed: 0,year,period,month,cpi_value
0,2024,M12,December,315.605
1,2024,M11,November,315.493
2,2024,M10,October,315.664
3,2024,M09,September,315.301
4,2024,M08,August,314.796


Unnamed: 0,industry,year,period,month,employment_level
0,Total Nonfarm Employment,2024,M12,December,12760.0
1,Total Nonfarm Employment,2024,M11,November,12770.0
2,Total Nonfarm Employment,2024,M10,October,12750.0
3,Total Nonfarm Employment,2024,M09,September,12800.0
4,Total Nonfarm Employment,2024,M08,August,12800.0


Unnamed: 0,year,period,month,unemployment_rate
0,2024,M12,December,3.1
1,2024,M11,November,3.1
2,2024,M10,October,3.2
3,2024,M09,September,3.2
4,2024,M08,August,3.2


# Load Transformed Tables to BigQuery


In [5]:
from google.cloud import bigquery

PROJECT_ID = "inst767-project-481615"   
DATASET_ID = "inst767_sp25"             

client = bigquery.Client(project=PROJECT_ID)

dataset_ref = f"{PROJECT_ID}.{DATASET_ID}"
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "US"
client.create_dataset(dataset, exists_ok=True)

tables = {
    "cpi_inflation": cpi_df,
    "ces_employment": ces_df,
    "laus_unemployment": laus_df
}

for table_name, df in tables.items():
    table_id = f"{PROJECT_ID}.{DATASET_ID}.{table_name}"
    job = client.load_table_from_dataframe(df, table_id)
    job.result()
    print(f"✅ Uploaded {table_id} ({len(df)} rows)")




✅ Uploaded inst767-project-481615.inst767_sp25.cpi_inflation (120 rows)




✅ Uploaded inst767-project-481615.inst767_sp25.ces_employment (360 rows)




✅ Uploaded inst767-project-481615.inst767_sp25.laus_unemployment (360 rows)


In [4]:

from google.cloud import pubsub_v1
import json

PROJECT_ID = "inst767-project-481615"
TOPIC_ID = "macro-indicators-ingest"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)

message = {
    "source": "macro_indicators_ingest",
    "status": "completed",
    "dataset": "inst767_sp25",
    "tables": ["cpi_inflation", "ces_employment", "laus_unemployment"]
}

# Publish message (async)
future = publisher.publish(
    topic_path,
    json.dumps(message).encode("utf-8")
)

print("Message published to Pub/Sub pipeline")


Message published to Pub/Sub pipeline
