# API Ingest & SYNC → AWS S3 
This notebook automates the retrieval of BLS/DataUSA data and stores it in Amazon S3.  It uses the BLS Public API to retieve data. 
This sync version keeps S3 matched with the website and detects changes, updates, deletes automatically:
* New files → get added.
* Changed files → get updated.
* Deleted files → get removed.
  
**[View Notebook (Foundational Version)](https://github.com/ScottySchmidt/AWS_DataEngineer_API/blob/main/01-ingest-apis-to-s3.ipynb)**

This version laid the groundwork for the improved **Sync Version**, which now mirrors the full BLS directory with adds/updates/deletes and supports optional targeted series syncs.

### What's Covered
- **Automated sync** from data api source to S3.
- **No hardcoded file names** – dynamically scrapes the BLS file list.
- **403 error handling** – uses a valid User-Agent to comply with BLS access policy.
- **Cloud-based execution** – runs in Kaggle with secure secret management.
- **Secrets used** – AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION, BUCKET_NAME, BLS_API_KEY.
- **Duplicate protection** – checks content hashes before uploading.

### How It Works
1. Fetch the current list of files from the BLS public directory.
2. Download each file and compare its hash to the version in S3.
3. Upload new or changed files to the configured S3 bucket.
4. Skip unchanged files to save bandwidth and storage.

## Connect to AWS S3
This notebook requires the following Python packages:  
- boto3  
- requests  
- hashlib  
- kaggle_secrets: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION, BUCKET_NAME, BLS_API_KEY

In [1]:
import os
import boto3

# Load secrets: Kaggle first, then env vars
try:
    from kaggle_secrets import UserSecretsClient
    secrets = UserSecretsClient()
    AWS_ACCESS_KEY_ID     = secrets.get_secret("AWS_ACCESS_KEY_ID")
    AWS_SECRET_ACCESS_KEY = secrets.get_secret("AWS_SECRET_ACCESS_KEY")
    AWS_REGION            = secrets.get_secret("AWS_REGION") or os.getenv("AWS_REGION", "us-east-1")
    BUCKET_NAME           = secrets.get_secret("BUCKET_NAME") or os.environ["BUCKET_NAME"]
except Exception:
    AWS_ACCESS_KEY_ID     = os.getenv("AWS_ACCESS_KEY_ID")
    AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
    AWS_REGION            = os.getenv("AWS_REGION", "us-east-1")
    BUCKET_NAME           = os.environ["BUCKET_NAME"]

session = boto3.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=AWS_REGION
)
s3 = session.client("s3")

def s3_health_check(bucket):
    try:
        s3.head_bucket(Bucket=bucket)
        page = s3.list_objects_v2(Bucket=bucket, MaxKeys=1)
        print("S3 connection successful. Keys visible:", page.get("KeyCount", 0))
        return True
    except Exception as e:
        print("S3 check failed:", str(e))
        return False

if not s3_health_check(BUCKET_NAME):
    raise SystemExit("Fix your S3 setup before running sync.")


S3 connection successful. Keys visible: 1


## **Sync BLS Data to S3**
- Script watches for new, changed, or deleted files.  
- Uses a custom ID so BLS doesn’t block us.  
- Checks if a file is different before uploading.  
- Only uploads when needed to save space and time.  
- Keeps S3 matched up with the BLS site.  

## Get BLS Data with an API Key
- Use your API key to log in the right way.  
- Pull U.S. inflation data straight from the BLS.  

In [2]:
import os, re, json, hashlib, requests, boto3, botocore
print("starting")

# AWS / env
S3          = boto3.client("s3", region_name=os.getenv("AWS_REGION", "us-east-1"))
#BUCKET_NAME = os.environ["BUCKET_NAME"]
USER_AGENT  = os.getenv("USER_AGENT", "ScottSchmidt/1.0 (email)")
SERIES_IDS  = [s.strip() for s in os.getenv("SERIES_IDS", "").split(",") if s.strip()]
BLS_API_KEY = os.getenv("BLS_API_KEY")  # only needed if SERIES_IDS is set

# Bulk sync (BLS pr/ directory)
BASE_URL   = "https://download.bls.gov/pub/time.series/pr/"
BLS_PREFIX = os.getenv("BLS_PREFIX", "bls/pr/")
ALLOW      = re.compile(r"^[A-Za-z0-9._-]+$")

def md5_text(t: str) -> str:
    return hashlib.md5(t.encode("utf-8")).hexdigest()

def md5_bytes(b: bytes) -> str:
    return hashlib.md5(b).hexdigest()

def list_upstream_files():
    r = requests.get(BASE_URL, headers={"User-Agent": USER_AGENT}, timeout=30)
    r.raise_for_status()
    hrefs = re.findall(r'href="([^"]+)"', r.text)
    return [h for h in hrefs if ALLOW.match(h)]

def list_s3_keys(prefix):
    keys, paginator = [], S3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=BUCKET_NAME, Prefix=prefix):
        for obj in (page.get("Contents") or []):
            keys.append(obj["Key"])
    return keys

def sync_bulk_file(name: str):
    key = f"{BLS_PREFIX}{name}"
    r = requests.get(BASE_URL + name, headers={"User-Agent": USER_AGENT}, timeout=60)

    if r.status_code == 404:
        try:
            S3.delete_object(Bucket=BUCKET_NAME, Key=key)
            print(f"deleted: {name}")
            return "deleted"
        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] != "NoSuchKey":
                raise
            return "absent"

    r.raise_for_status()
    content = r.text
    new_hash = md5_text(content)

    try:
        obj = S3.get_object(Bucket=BUCKET_NAME, Key=key)
        if md5_text(obj["Body"].read().decode("utf-8")) == new_hash:
            print(f"no-change: {name}")
            return "unchanged"
        S3.put_object(Bucket=BUCKET_NAME, Key=key, Body=content, ContentType="text/plain")
        print(f"updated: {name}")
        return "updated"
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "NoSuchKey":
            S3.put_object(Bucket=BUCKET_NAME, Key=key, Body=content, ContentType="text/plain")
            print(f"added: {name}")
            return "added"
        raise

def run_bulk_full_sync():
    upstream = set(list_upstream_files())
    s3_files = set(k[len(BLS_PREFIX):] for k in list_s3_keys(BLS_PREFIX) if k.startswith(BLS_PREFIX))

    added = updated = unchanged = 0
    for name in sorted(upstream):
        res = sync_bulk_file(name)
        if res == "added": added += 1
        elif res == "updated": updated += 1
        elif res == "unchanged": unchanged += 1

    extras = s3_files - upstream
    deleted = 0
    for name in sorted(extras):
        S3.delete_object(Bucket=BUCKET_NAME, Key=f"{BLS_PREFIX}{name}")
        print(f"deleted-extra: {name}")
        deleted += 1

    return {"added": added, "updated": updated, "unchanged": unchanged, "deleted": deleted}

def run_series_sync(series_ids):
    if not series_ids:
        return {"upserted": 0, "unchanged": 0, "skipped": True}
    if not BLS_API_KEY:
        return {"error": "BLS_API_KEY missing", "upserted": 0, "unchanged": 0}

    upserted = unchanged = 0
    for sid in series_ids:
        print(f"series: {sid}")
        payload = json.dumps({"seriesid": [sid], "registrationkey": BLS_API_KEY})
        r = requests.post(
            "https://api.bls.gov/publicAPI/v2/timeseries/data/",
            data=payload,
            headers={"Content-type": "application/json"},
            timeout=60,
        )
        if r.status_code != 200:
            print(f"series-fail: {sid} {r.status_code}")
            continue

        content = r.text.encode("utf-8")
        key = f"bls/series/{sid}.json"

        try:
            obj = S3.get_object(Bucket=BUCKET_NAME, Key=key)
            if md5_bytes(obj["Body"].read()) == md5_bytes(content):
                print(f"series-no-change: {sid}")
                unchanged += 1
                continue
        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] != "NoSuchKey":
                raise

        S3.put_object(Bucket=BUCKET_NAME, Key=key, Body=content, ContentType="application/json")
        print(f"series-upserted: {sid}")
        upserted += 1

    return {"upserted": upserted, "unchanged": unchanged}

def lambda_handler(event, context):
    print("start")
    bulk   = run_bulk_full_sync()
    series = run_series_sync(SERIES_IDS)
    summary = {"bulk": bulk, "series": series}
    print(json.dumps(summary))
    return {"statusCode": 200, "body": json.dumps(summary)}
print("Done.")

starting
Done.


## Preview Synced BLS Data
Load the full set of BLS JSON files from S3 (kept in sync with the source) into a single DataFrame for analysis.

In [3]:
import pandas as pd
import json

# list all json files under your prefix
objs = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=BLS_PREFIX).get("Contents", [])

frames = []
for o in objs:
    key = o["Key"]
    if not key.endswith(".json"):
        continue

    obj = s3.get_object(Bucket=BUCKET_NAME, Key=key)
    content = obj["Body"].read().decode("utf-8")
    data = json.loads(content)

    series = data.get("Results", {}).get("series")
    if not series:
        continue

    records = series[0].get("data", [])
    if not records:
        continue

    df = pd.DataFrame(records)
    cols = [c for c in ["year", "period", "value", "footnotes"] if c in df.columns]
    df = df[cols]
    df["source_file"] = key
    frames.append(df)

# one DataFrame with all series files
all_data = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()

print("rows:", len(all_data))
all_data.head(20)


rows: 0
